From core-commits-return-8448-apmail-hadoop-core-commits-archive=hadoop.apache.org@hadoop.apache.org Wed Apr 08 12:48:18 2009 Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 9952 invoked from network); 8 Apr 2009 12:48:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Apr 2009 12:48:18 -0000 Received: (qmail 75103 invoked by uid 500); 8 Apr 2009 12:48:17 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 75025 invoked by uid 500); 8 Apr 2009 12:48:17 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 75016 invoked by uid 99); 8 Apr 2009 12:48:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Apr 2009 12:48:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Apr 2009 12:48:07 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 23E9423889BB; Wed, 8 Apr 2009 12:47:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r763223 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/ipc/ src/test/org/apache/hadoop/mapred/ Date: Wed, 08 Apr 2009 12:47:45 -0000 To: core-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090408124746.23E9423889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sharad Date: Wed Apr 8 12:47:44 2009 New Revision: 763223 URL: http://svn.apache.org/viewvc?rev=763223&view=rev Log: HADOOP-5394. JobTracker might schedule 2 attempts of the same task with the same attempt id across restarts. Contributed by Amar kamat. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed Apr 8 12:47:44 2009 @@ -1210,6 +1210,9 @@ HADOOP-5585. Clear FileSystem statistics between tasks when jvm-reuse is enabled. (omalley) + HADOOP-5394. JobTracker might schedule 2 attempts of the same task + with the same attempt id across restarts. (Amar Kamat via sharad) + Release 0.19.2 - Unreleased BUG FIXES Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed Apr 8 12:47:44 2009 @@ -122,7 +122,7 @@ FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, - TRACKER_NAME, STATE_STRING, VERSION, RESTART_COUNT + TRACKER_NAME, STATE_STRING, VERSION } /** @@ -1167,9 +1167,15 @@ * @param submitTime job's submit time * @param launchTime job's launch time * @param restartCount number of times the job got restarted + * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead. */ public static void logJobInfo(JobID jobid, long submitTime, long launchTime, int restartCount){ + logJobInfo(jobid, submitTime, launchTime); + } + + public static void logJobInfo(JobID jobid, long submitTime, long launchTime) + { if (!disableHistory){ String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid; ArrayList writer = openJobs.get(logFileKey); @@ -1177,11 +1183,10 @@ if (null != writer){ JobHistory.log(writer, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, - Keys.LAUNCH_TIME, Keys.RESTART_COUNT}, + Keys.LAUNCH_TIME}, new String[] {jobid.toString(), String.valueOf(submitTime), - String.valueOf(launchTime), - String.valueOf(restartCount)}); + String.valueOf(launchTime)}); } } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Apr 8 12:47:44 2009 @@ -164,7 +164,7 @@ long finishTime; // Indicates how many times the job got restarted - private int restartCount = 0; + private final int restartCount; private JobConf conf; AtomicBoolean tasksInited = new AtomicBoolean(false); @@ -214,6 +214,7 @@ this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL; this.anyCacheLevel = this.maxLevel+1; this.jobtracker = null; + this.restartCount = 0; } /** @@ -222,6 +223,12 @@ */ public JobInProgress(JobID jobid, JobTracker jobtracker, JobConf default_conf) throws IOException { + this(jobid, jobtracker, default_conf, 0); + } + + public JobInProgress(JobID jobid, JobTracker jobtracker, + JobConf default_conf, int rCount) throws IOException { + this.restartCount = rCount; this.jobId = jobid; String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; @@ -595,19 +602,17 @@ } // Update the job start/launch time (upon restart) and log to history - synchronized void updateJobInfo(long startTime, long launchTime, int count) { + synchronized void updateJobInfo(long startTime, long launchTime) { // log and change to the job's start/launch time this.startTime = startTime; this.launchTime = launchTime; - // change to the job's restart count - this.restartCount = count; - JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime, count); + JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime); } /** * Get the number of times the job has restarted */ - int numRestarts() { + int getNumRestarts() { return restartCount; } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Apr 8 12:47:44 2009 @@ -46,6 +46,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,6 +125,10 @@ final static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0733); // rwx-wx-wx + // system files should have 700 permission + final static FsPermission SYSTEM_FILE_PERMISSION = + FsPermission.createImmutable((short) 0700); // rwx------ + /** * A client tried to submit a job before the Job Tracker was ready. */ @@ -672,6 +678,8 @@ Set jobsToRecover; // set of jobs to be recovered private int totalEventsRecovered = 0; + private int restartCount = 0; + private boolean shouldRecover = false; Set recoveredTrackers = Collections.synchronizedSet(new HashSet()); @@ -850,7 +858,7 @@ } public boolean shouldRecover() { - return jobsToRecover.size() != 0; + return shouldRecover; } public boolean shouldSchedule() { @@ -888,18 +896,16 @@ // checks if the job dir has the required files public void checkAndAddJob(FileStatus status) throws IOException { - String jobName = status.getPath().getName(); - if (isJobNameValid(jobName)) { + String fileName = status.getPath().getName(); + if (isJobNameValid(fileName)) { if (JobClient.isJobDirValid(status.getPath(), fs)) { - recoveryManager.addJobForRecovery(JobID.forName(jobName)); + recoveryManager.addJobForRecovery(JobID.forName(fileName)); + shouldRecover = true; // enable actual recovery if num-files > 1 } else { - LOG.info("Found an incomplete job directory " + jobName + "." + LOG.info("Found an incomplete job directory " + fileName + "." + " Deleting it!!"); fs.delete(status.getPath(), true); } - } else { - LOG.info("Deleting " + status.getPath()); - fs.delete(status.getPath(), true); } } @@ -918,8 +924,7 @@ // Set the start/launch time only if there are recovered tasks // Increment the job's restart count jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), - job.getLong(JobHistory.Keys.LAUNCH_TIME), - job.getInt(Keys.RESTART_COUNT) + 1); + job.getLong(JobHistory.Keys.LAUNCH_TIME)); // Save the new job status JobStatus newStatus = (JobStatus)jip.getStatus().clone(); @@ -1119,7 +1124,84 @@ expireLaunchingTasks.removeTask(attemptId); } + Path getRestartCountFile() { + return new Path(getSystemDir(), "jobtracker.info"); + } + + Path getTempRestartCountFile() { + return new Path(getSystemDir(), "jobtracker.info.recover"); + } + + /** + * Initialize the recovery process. It simply creates a jobtracker.info file + * in the jobtracker's system directory and writes its restart count in it. + * For the first start, the jobtracker writes '0' in it. Upon subsequent + * restarts the jobtracker replaces the count with its current count which + * is (old count + 1). The whole purpose of this api is to obtain restart + * counts across restarts to avoid attempt-id clashes. + * + * Note that in between if the jobtracker.info files goes missing then the + * jobtracker will disable recovery and continue. + * + */ + void updateRestartCount() throws IOException { + Path restartFile = getRestartCountFile(); + Path tmpRestartFile = getTempRestartCountFile(); + FileSystem fs = restartFile.getFileSystem(conf); + FsPermission filePerm = new FsPermission(SYSTEM_FILE_PERMISSION); + + // read the count from the jobtracker info file + if (fs.exists(restartFile)) { + fs.delete(tmpRestartFile, false); // delete the tmp file + } else if (fs.exists(tmpRestartFile)) { + // if .rec exists then delete the main file and rename the .rec to main + fs.rename(tmpRestartFile, restartFile); // rename .rec to main file + } else { + // For the very first time the jobtracker will create a jobtracker.info + // file. If the jobtracker has restarted then disable recovery as files' + // needed for recovery are missing. + + // disable recovery if this is a restart + shouldRecover = false; + + // write the jobtracker.info file + FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm); + out.writeInt(0); + out.close(); + return; + } + + FSDataInputStream in = fs.open(restartFile); + // read the old count + restartCount = in.readInt(); + ++restartCount; // increment the restart count + in.close(); + + // Write back the new restart count and rename the old info file + //TODO This is similar to jobhistory recovery, maybe this common code + // can be factored out. + + // write to the tmp file + FSDataOutputStream out = FileSystem.create(fs, tmpRestartFile, filePerm); + out.writeInt(restartCount); + out.close(); + + // delete the main file + fs.delete(restartFile, false); + + // rename the .rec to main file + fs.rename(tmpRestartFile, restartFile); + } + public void recover() { + if (!shouldRecover()) { + // clean up jobs structure + jobsToRecover.clear(); + return; + } + + LOG.info("Restart count of the jobtracker : " + restartCount); + // I. Init the jobs and cache the recovered job history filenames Map jobHistoryFilenameMap = new HashMap(); Iterator idIter = jobsToRecover.iterator(); @@ -1128,7 +1210,8 @@ LOG.info("Trying to recover details of job " + id); try { // 1. Create the job object - JobInProgress job = new JobInProgress(id, JobTracker.this, conf); + JobInProgress job = + new JobInProgress(id, JobTracker.this, conf, restartCount); // 2. Check if the user has appropriate access // Get the user group info for the job's owner @@ -1209,8 +1292,6 @@ // 3. Close the listener listener.close(); - LOG.info("Restart count for job " + id + " is " + pJob.numRestarts()); - // 4. Update the recovery metric totalEventsRecovered += listener.getNumEventsRecovered(); @@ -1529,7 +1610,6 @@ } // Make sure that the backup data is preserved FileStatus[] systemDirData = fs.listStatus(this.systemDir); - LOG.info("Cleaning up the system directory"); // Check if the history is enabled .. as we cant have persistence with // history disabled if (conf.getBoolean("mapred.jobtracker.restart.recover", false) @@ -1550,6 +1630,7 @@ break; // if there is something to recover else clean the sys dir } } + LOG.info("Cleaning up the system directory"); fs.delete(systemDir, true); if (FileSystem.mkdirs(fs, systemDir, new FsPermission(SYSTEM_DIR_PERMISSION))) { @@ -1566,6 +1647,24 @@ } Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD); } + + // Prepare for recovery. This is done irrespective of the status of restart + // flag. + try { + recoveryManager.updateRestartCount(); + } catch (IOException ioe) { + LOG.warn("Failed to initialize recovery manager. The Recovery manager " + + "failed to access the system files in the system dir (" + + getSystemDir() + ")."); + LOG.warn("It might be because the JobTracker failed to read/write system" + + " files (" + recoveryManager.getRestartCountFile() + " / " + + recoveryManager.getTempRestartCountFile() + ") or the system " + + " file " + recoveryManager.getRestartCountFile() + + " is missing!"); + LOG.warn("Bailing out..."); + throw ioe; + } + // Same with 'localDir' except it's always on the local disk. jobConf.deleteLocalFiles(SUBDIR); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Wed Apr 8 12:47:44 2009 @@ -887,7 +887,7 @@ TaskAttemptID taskid = null; if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) { // Make sure that the attempts are unqiue across restarts - int attemptId = job.numRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId; + int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId; taskid = new TaskAttemptID( id, attemptId); ++nextTaskId; } else { Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Wed Apr 8 12:47:44 2009 @@ -66,6 +66,29 @@ return NameNode.createNameNode(args, config); } + /** + * Start the data-node. + */ + public DataNode startDataNode(int index, Configuration config) + throws IOException { + String dataDir = System.getProperty("test.build.data"); + File dataNodeDir = new File(dataDir, "data-" + index); + config.set("dfs.data.dir", dataNodeDir.getPath()); + + String[] args = new String[] {}; + // NameNode will modify config with the ports it bound to + return DataNode.createDataNode(args, config); + } + + /** + * Stop the datanode. + */ + public void stopDataNode(DataNode dn) { + if (dn != null) { + dn.shutdown(); + } + } + public void stopNameNode(NameNode nn) { if (nn != null) { nn.stop(); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Wed Apr 8 12:47:44 2009 @@ -85,6 +85,7 @@ assertTrue(dfs.exists(filePath)); // This will test TPC to a JobTracker + fs = FileSystem.get(sconf); mr = new MiniMRCluster(1, fs.getUri().toString(), 1); final int jobTrackerPort = mr.getJobTrackerPort(); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Wed Apr 8 12:47:44 2009 @@ -543,24 +543,28 @@ jobTrackerThread = new Thread(jobTracker); jobTrackerThread.start(); - while (!jobTracker.isUp()) { + while (jobTracker.isActive() && !jobTracker.isUp()) { try { // let daemons get started Thread.sleep(1000); } catch(InterruptedException e) { } } - ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false); - while (jobTracker.isActive() && status.getJobTrackerState() == JobTracker.State.INITIALIZING) { - try { - LOG.info("JobTracker still initializing. Waiting."); - Thread.sleep(1000); - } catch(InterruptedException e) {} + // is the jobtracker has started then wait for it to init + ClusterStatus status = null; + if (jobTracker.isUp()) { status = jobTracker.getJobTracker().getClusterStatus(false); + while (jobTracker.isActive() && status.getJobTrackerState() + == JobTracker.State.INITIALIZING) { + try { + LOG.info("JobTracker still initializing. Waiting."); + Thread.sleep(1000); + } catch(InterruptedException e) {} + status = jobTracker.getJobTracker().getClusterStatus(false); + } } - if (!jobTracker.isActive() - || status.getJobTrackerState() != JobTracker.State.RUNNING) { + if (!jobTracker.isActive()) { // return if jobtracker has crashed return; } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Wed Apr 8 12:47:44 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import junit.framework.TestCase; import org.apache.hadoop.hdfs.TestHDFSServerPorts; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.fs.FileSystem; @@ -112,8 +113,10 @@ */ public void testJobTrackerPorts() throws Exception { NameNode nn = null; + DataNode dn = null; try { nn = hdfs.startNameNode(); + dn = hdfs.startDataNode(1, hdfs.getConfig()); // start job tracker on the same port as name-node JobConf conf2 = new JobConf(hdfs.getConfig()); @@ -139,6 +142,7 @@ assertTrue(started); // should start now } finally { + hdfs.stopDataNode(dn); hdfs.stopNameNode(nn); } } @@ -148,10 +152,12 @@ */ public void testTaskTrackerPorts() throws Exception { NameNode nn = null; + DataNode dn = null; JobTracker jt = null; JTRunner runner = null; try { nn = hdfs.startNameNode(); + dn = hdfs.startDataNode(2, hdfs.getConfig()); JobConf conf2 = new JobConf(hdfs.getConfig()); runner = new JTRunner(); @@ -187,6 +193,7 @@ runner.interrupt(); runner.join(); } + hdfs.stopDataNode(dn); hdfs.stopNameNode(nn); } } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Wed Apr 8 12:47:44 2009 @@ -239,7 +239,7 @@ Path sysDir = new Path(jobtracker.getSystemDir()); FileSystem fs = sysDir.getFileSystem(conf); int size = fs.listStatus(sysDir).length; - while (size > 0) { + while (size > 1) { // ignore the jobtracker.info file System.out.println("Waiting for the job files in sys directory to be cleaned up"); UtilsForTests.waitFor(100); size = fs.listStatus(sysDir).length; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=763223&r1=763222&r2=763223&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Wed Apr 8 12:47:44 2009 @@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobTracker.RecoveryManager; +import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner; +import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler; import org.apache.hadoop.security.UserGroupInformation; /** @@ -261,4 +263,139 @@ mr.shutdown(); } + + /** + * Test if restart count of the jobtracker is correctly managed. + * Steps are as follows : + * - start the jobtracker and check if the info file gets created. + * - stops the jobtracker, deletes the jobtracker.info file and checks if + * upon restart the recovery is 'off' + * - submit a job to the jobtracker. + * - restart the jobtracker k times and check if the restart count on ith + * iteration is i. + * - submit a new job and check if its restart count is 0. + * - garble the jobtracker.info file and restart he jobtracker, the + * jobtracker should crash. + */ + public void testRestartCount() throws Exception { + LOG.info("Testing restart-count"); + String signalFile = new Path(TEST_DIR, "signal").toString(); + + // clean up + FileSystem fs = FileSystem.get(new Configuration()); + fs.delete(TEST_DIR, true); + + JobConf conf = new JobConf(); + conf.set("mapred.jobtracker.job.history.block.size", "1024"); + conf.set("mapred.jobtracker.job.history.buffer.size", "1024"); + conf.setBoolean("mapred.jobtracker.restart.recover", true); + // since there is no need for initing + conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class, + TaskScheduler.class); + + MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); + JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); + JobClient jc = new JobClient(mr.createJobConf()); + + // check if the jobtracker info file exists + Path infoFile = jobtracker.recoveryManager.getRestartCountFile(); + assertTrue("Jobtracker infomation is missing", fs.exists(infoFile)); + + // check if garbling the system files disables the recovery process + LOG.info("Stopping jobtracker for testing with system files deleted"); + mr.stopJobTracker(); + + // delete the info file + Path rFile = jobtracker.recoveryManager.getRestartCountFile(); + fs.delete(rFile,false); + + // start the jobtracker + LOG.info("Stopping jobtracker with system files deleted"); + mr.startJobTracker(); + + UtilsForTests.waitForJobTracker(jc); + jobtracker = mr.getJobTrackerRunner().getJobTracker(); + + // check if the recovey is disabled + assertFalse("Recovery is not disabled upon missing system files", + jobtracker.recoveryManager.shouldRecover()); + + // check if the system dir is sane + assertTrue("Recovery file is missing upon restart", fs.exists(rFile)); + Path tFile = jobtracker.recoveryManager.getTempRestartCountFile(); + assertFalse("Temp recovery file exists upon restart", fs.exists(tFile)); + + // submit a job + JobConf job = mr.createJobConf(); + + UtilsForTests.configureWaitingJobConf(job, + new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, + "test-recovery-manager", signalFile, signalFile); + + // submit the faulty job + RunningJob rJob = jc.submitJob(job); + LOG.info("Submitted first job " + rJob.getID()); + + // kill the jobtracker multiple times and check if the count is correct + for (int i = 1; i <= 5; ++i) { + LOG.info("Stopping jobtracker for " + i + " time"); + mr.stopJobTracker(); + + // start the jobtracker + LOG.info("Starting jobtracker for " + i + " time"); + mr.startJobTracker(); + + UtilsForTests.waitForJobTracker(jc); + + // check if the system dir is sane + assertTrue("Recovery file is missing upon restart", fs.exists(rFile)); + assertFalse("Temp recovery file exists upon restart", fs.exists(tFile)); + + jobtracker = mr.getJobTrackerRunner().getJobTracker(); + JobInProgress jip = jobtracker.getJob(rJob.getID()); + + // assert if restart count is correct + assertEquals("Recovery manager failed to recover restart count", + i, jip.getNumRestarts()); + } + + // kill the old job + rJob.killJob(); + + // II. Submit a new job and check if the restart count is 0 + JobConf job1 = mr.createJobConf(); + + UtilsForTests.configureWaitingJobConf(job1, + new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, + "test-recovery-manager", signalFile, signalFile); + + // make sure that the job id's dont clash + jobtracker.getNewJobId(); + + // submit a new job + rJob = jc.submitJob(job1); + LOG.info("Submitted first job after restart" + rJob.getID()); + + // assert if restart count is correct + JobInProgress jip = jobtracker.getJob(rJob.getID()); + assertEquals("Restart count for new job is incorrect", + 0, jip.getNumRestarts()); + + LOG.info("Stopping jobtracker for testing the fs errors"); + mr.stopJobTracker(); + + // check if system.dir problems in recovery kills the jobtracker + fs.delete(rFile, false); + FSDataOutputStream out = fs.create(rFile); + out.writeBoolean(true); + out.close(); + + // start the jobtracker + LOG.info("Starting jobtracker with fs errors"); + mr.startJobTracker(); + JobTrackerRunner runner = mr.getJobTrackerRunner(); + assertFalse("Restart count for new job is incorrect", runner.isActive()); + + mr.shutdown(); + } }