Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 88147DC79 for ; Tue, 25 Sep 2012 17:24:29 +0000 (UTC) Received: (qmail 5478 invoked by uid 500); 25 Sep 2012 17:24:29 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 5385 invoked by uid 500); 25 Sep 2012 17:24:29 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 5377 invoked by uid 99); 25 Sep 2012 17:24:29 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Sep 2012 17:24:29 +0000 X-ASF-Spam-Status: No, hits=-1999.0 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Sep 2012 17:24:23 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 73F702388AA6 for ; Tue, 25 Sep 2012 17:23:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1390006 - in /hadoop/common/branches/branch-1.1: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Tue, 25 Sep 2012 17:23:38 -0000 To: common-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120925172338.73F702388AA6@eris.apache.org> Author: acmurthy Date: Tue Sep 25 17:23:37 2012 New Revision: 1390006 URL: http://svn.apache.org/viewvc?rev=1390006&view=rev Log: Merge -c 1356904 from branch-1 to branch-1.1 to fix MAPREDUCE-3837. Job tracker is not able to recover job in case of crash and after that no user can submit job. Modified: hadoop/common/branches/branch-1.1/CHANGES.txt hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1390006&r1=1390005&r2=1390006&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 17:23:37 2012 @@ -365,6 +365,9 @@ Release 1.1.0 - 2012.09.16 MAPREDUCE-4675. Fixed a race condition caused in TestKillSubProcesses caused due to a recent commit. (Bikas Saha via vinodkv) + MAPREDUCE-3837. Job tracker is not able to recover job in case of crash + and after that no user can submit job. (Mayank Bansal via tomwhite) + Release 1.0.4 - Unreleased NEW FEATURES Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390006&r1=1390005&r2=1390006&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 25 17:23:37 2012 @@ -205,6 +205,7 @@ public class JobTracker implements MRCon State state = State.INITIALIZING; private static final int FS_ACCESS_RETRY_PERIOD = 10000; static final String JOB_INFO_FILE = "job-info"; + static final String JOB_TOKEN_FILE = "jobToken"; private DNSToSwitchMapping dnsToSwitchMapping; private NetworkTopology clusterMap = new NetworkTopology(); private int numTaskCacheLevels; // the max level to which we cache tasks @@ -1215,179 +1216,6 @@ public class JobTracker implements MRCon /** A custom listener that replays the events in the order in which the * events (task attempts) occurred. */ - class JobRecoveryListener implements Listener { - // The owner job - private JobInProgress jip; - - private JobHistory.JobInfo job; // current job's info object - - // Maintain the count of the (attempt) events recovered - private int numEventsRecovered = 0; - - // Maintains open transactions - private Map hangingAttempts = - new HashMap(); - - // Whether there are any updates for this job - private boolean hasUpdates = false; - - public JobRecoveryListener(JobInProgress jip) { - this.jip = jip; - this.job = new JobHistory.JobInfo(jip.getJobID().toString()); - } - - /** - * Process a task. Note that a task might commit a previously pending - * transaction. - */ - private void processTask(String taskId, JobHistory.Task task) { - // Any TASK info commits the previous transaction - boolean hasHanging = hangingAttempts.remove(taskId) != null; - if (hasHanging) { - numEventsRecovered += 2; - } - - TaskID id = TaskID.forName(taskId); - TaskInProgress tip = getTip(id); - - updateTip(tip, task); - } - - /** - * Adds a task-attempt in the listener - */ - private void processTaskAttempt(String taskAttemptId, - JobHistory.TaskAttempt attempt) - throws UnknownHostException { - TaskAttemptID id = TaskAttemptID.forName(taskAttemptId); - - // Check if the transaction for this attempt can be committed - String taskStatus = attempt.get(Keys.TASK_STATUS); - TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId); - JobInProgress jip = getJob(taskID.getJobID()); - JobStatus prevStatus = (JobStatus)jip.getStatus().clone(); - - if (taskStatus.length() > 0) { - // This means this is an update event - if (taskStatus.equals(Values.SUCCESS.name())) { - // Mark this attempt as hanging - hangingAttempts.put(id.getTaskID().toString(), taskAttemptId); - addSuccessfulAttempt(jip, id, attempt); - } else { - addUnsuccessfulAttempt(jip, id, attempt); - numEventsRecovered += 2; - } - } else { - createTaskAttempt(jip, id, attempt); - } - - JobStatus newStatus = (JobStatus)jip.getStatus().clone(); - if (prevStatus.getRunState() != newStatus.getRunState()) { - if(LOG.isDebugEnabled()) - LOG.debug("Status changed hence informing prevStatus" + prevStatus + " currentStatus "+ newStatus); - JobStatusChangeEvent event = - new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, - prevStatus, newStatus); - updateJobInProgressListeners(event); - } - } - - public void handle(JobHistory.RecordTypes recType, Map values) throws IOException { - if (recType == JobHistory.RecordTypes.Job) { - // Update the meta-level job information - job.handle(values); - - // Forcefully init the job as we have some updates for it - checkAndInit(); - } else if (recType.equals(JobHistory.RecordTypes.Task)) { - String taskId = values.get(Keys.TASKID); - - // Create a task - JobHistory.Task task = new JobHistory.Task(); - task.handle(values); - - // Ignore if its a cleanup task - if (isCleanup(task)) { - return; - } - - // Process the task i.e update the tip state - processTask(taskId, task); - } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) { - String attemptId = values.get(Keys.TASK_ATTEMPT_ID); - - // Create a task attempt - JobHistory.MapAttempt attempt = new JobHistory.MapAttempt(); - attempt.handle(values); - - // Ignore if its a cleanup task - if (isCleanup(attempt)) { - return; - } - - // Process the attempt i.e update the attempt state via job - processTaskAttempt(attemptId, attempt); - } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) { - String attemptId = values.get(Keys.TASK_ATTEMPT_ID); - - // Create a task attempt - JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt(); - attempt.handle(values); - - // Ignore if its a cleanup task - if (isCleanup(attempt)) { - return; - } - - // Process the attempt i.e update the job state via job - processTaskAttempt(attemptId, attempt); - } - } - - // Check if the task is of type CLEANUP - private boolean isCleanup(JobHistory.Task task) { - String taskType = task.get(Keys.TASK_TYPE); - return Values.CLEANUP.name().equals(taskType); - } - - // Init the job if its ready for init. Also make sure that the scheduler - // is updated - private void checkAndInit() throws IOException { - String jobStatus = this.job.get(Keys.JOB_STATUS); - if (Values.PREP.name().equals(jobStatus)) { - hasUpdates = true; - LOG.info("Calling init from RM for job " + jip.getJobID().toString()); - try { - initJob(jip); - } catch (Throwable t) { - LOG.error("Job initialization failed : \n" - + StringUtils.stringifyException(t)); - jip.status.setFailureInfo("Job Initialization failed: \n" - + StringUtils.stringifyException(t)); - failJob(jip); - throw new IOException(t); - } - } - } - - void close() { - if (hasUpdates) { - // Apply the final (job-level) updates - JobStatusChangeEvent event = updateJob(jip, job); - - synchronized (JobTracker.this) { - // Update the job listeners - updateJobInProgressListeners(event); - } - } - } - - public int getNumEventsRecovered() { - return numEventsRecovered; - } - - } public RecoveryManager() { jobsToRecover = new TreeSet(); @@ -1441,16 +1269,25 @@ public class JobTracker implements MRCon // checks if the job dir has the required files public void checkAndAddJob(FileStatus status) throws IOException { String fileName = status.getPath().getName(); - if (isJobNameValid(fileName)) { - if (JobClient.isJobDirValid(status.getPath(), fs)) { - recoveryManager.addJobForRecovery(JobID.forName(fileName)); - shouldRecover = true; // enable actual recovery if num-files > 1 - } else { - LOG.info("Found an incomplete job directory " + fileName + "." - + " Deleting it!!"); - fs.delete(status.getPath(), true); - } + if (isJobNameValid(fileName) && isJobDirValid(JobID.forName(fileName))) { + recoveryManager.addJobForRecovery(JobID.forName(fileName)); + shouldRecover = true; // enable actual recovery if num-files > 1 + } + } + + private boolean isJobDirValid(JobID jobId) throws IOException { + boolean ret = false; + Path jobInfoFile = getSystemFileForJob(jobId); + final Path jobTokenFile = getTokenFileForJob(jobId); + JobConf job = new JobConf(); + if (jobTokenFile.getFileSystem(job).exists(jobTokenFile) + && jobInfoFile.getFileSystem(job).exists(jobInfoFile)) { + ret = true; + } else { + LOG.warn("Job " + jobId + + " does not have valid info/token file so ignoring for recovery"); } + return ret; } private JobStatusChangeEvent updateJob(JobInProgress jip, @@ -1713,11 +1550,9 @@ public class JobTracker implements MRCon fs.rename(tmpRestartFile, restartFile); // rename .rec to main file } else { // For the very first time the jobtracker will create a jobtracker.info - // file. If the jobtracker has restarted then disable recovery as files' - // needed for recovery are missing. - - // disable recovery if this is a restart - shouldRecover = false; + // file. + // enable recovery if this is a restart + shouldRecover = true; // write the jobtracker.info file try { @@ -1769,205 +1604,50 @@ public class JobTracker implements MRCon fs.rename(tmpRestartFile, restartFile); } - // mapred.JobID::forName returns - @SuppressWarnings("unchecked") // mapreduce.JobID public void recover() { + int recovered = 0; + long recoveryProcessStartTime = clock.getTime(); if (!shouldRecover()) { // clean up jobs structure jobsToRecover.clear(); return; } - LOG.info("Restart count of the jobtracker : " + restartCount); - - // I. Init the jobs and cache the recovered job history filenames - Map jobHistoryFilenameMap = new HashMap(); - Iterator idIter = jobsToRecover.iterator(); - JobInProgress job = null; - File jobIdFile = null; - - // 0. Cleanup - try { - JobHistory.JobInfo.deleteConfFiles(); - } catch (IOException ioe) { - LOG.info("Error in cleaning up job history folder", ioe); - } - - while (idIter.hasNext()) { - JobID id = idIter.next(); - LOG.info("Trying to recover details of job " + id); + LOG.info("Starting the recovery process for " + jobsToRecover.size() + + " jobs ..."); + for (JobID jobId : jobsToRecover) { + LOG.info("Submitting job " + jobId); try { - // 1. Recover job owner and create JIP - jobIdFile = - new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString()); - - String user = null; - if (jobIdFile != null && jobIdFile.exists()) { - LOG.info("File " + jobIdFile + " exists for job " + id); - FileInputStream in = new FileInputStream(jobIdFile); - BufferedReader reader = null; - try { - reader = new BufferedReader(new InputStreamReader(in)); - user = reader.readLine(); - LOG.info("Recovered user " + user + " for job " + id); - } finally { - if (reader != null) { - reader.close(); + Path jobInfoFile = getSystemFileForJob(jobId); + final Path jobTokenFile = getTokenFileForJob(jobId); + FSDataInputStream in = fs.open(jobInfoFile); + final JobInfo token = new JobInfo(); + token.readFields(in); + in.close(); + final UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(token.getUser().toString()); + ugi.doAs(new PrivilegedExceptionAction() { + public JobStatus run() throws IOException, InterruptedException { + Credentials ts = null; + JobConf job = new JobConf(); + if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) { + ts = Credentials.readTokenStorageFile(jobTokenFile, job); } - in.close(); + return submitJob(JobID.downgrade(token.getJobID()), token + .getJobSubmitDir().toString(), ugi, ts, true); } - } - if (user == null) { - throw new RuntimeException("Incomplete job " + id); - } - - // Create the job - /* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE - * BACKPORTED (MAPREDUCE-873) - */ - job = new JobInProgress(JobTracker.this, conf, - new JobInfo((org.apache.hadoop.mapreduce.JobID) id, - new Text(user), new Path(getStagingAreaDirInternal(user))), - restartCount, new Credentials() /*HACK*/); - - // 2. Check if the user has appropriate access - // Get the user group info for the job's owner - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(job.getJobConf().getUser()); - LOG.info("Submitting job " + id + " on behalf of user " - + ugi.getShortUserName() + " in groups : " - + StringUtils.arrayToString(ugi.getGroupNames())); - - // check the access - try { - aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB); - } catch (Throwable t) { - LOG.warn("Access denied for user " + ugi.getShortUserName() - + " in groups : [" - + StringUtils.arrayToString(ugi.getGroupNames()) + "]"); - throw t; - } - - // 3. Get the log file and the file path - String logFileName = - JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id); - if (logFileName != null) { - Path jobHistoryFilePath = - JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); - - // 4. Recover the history file. This involved - // - deleting file.recover if file exists - // - renaming file.recover to file if file doesnt exist - // This makes sure that the (master) file exists - JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), - jobHistoryFilePath); - - // 5. Cache the history file name as it costs one dfs access - jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath); - } else { - LOG.info("No history file found for job " + id); - idIter.remove(); // remove from recovery list - } - - // 6. Sumbit the job to the jobtracker - addJob(id, job); - } catch (Throwable t) { - LOG.warn("Failed to recover job " + id + " Ignoring the job.", t); - idIter.remove(); - if (jobIdFile != null) { - jobIdFile.delete(); - jobIdFile = null; - } - if (job != null) { - job.fail(); - job = null; - } - continue; - } - } - - long recoveryStartTime = clock.getTime(); - - // II. Recover each job - idIter = jobsToRecover.iterator(); - while (idIter.hasNext()) { - JobID id = idIter.next(); - JobInProgress pJob = getJob(id); - - // 1. Get the required info - // Get the recovered history file - Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID()); - String logFileName = jobHistoryFilePath.getName(); - - FileSystem fs; - try { - fs = jobHistoryFilePath.getFileSystem(conf); - } catch (IOException ioe) { - LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.", - ioe); - continue; - } - - // 2. Parse the history file - // Note that this also involves job update - JobRecoveryListener listener = new JobRecoveryListener(pJob); - try { - JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), - listener, fs); - } catch (Throwable t) { - LOG.info("Error reading history file of job " + pJob.getJobID() - + ". Ignoring the error and continuing.", t); - } - - // 3. Close the listener - listener.close(); - - // 4. Update the recovery metric - totalEventsRecovered += listener.getNumEventsRecovered(); - - // 5. Cleanup history - // Delete the master log file as an indication that the new file - // should be used in future - try { - synchronized (pJob) { - JobHistory.JobInfo.checkpointRecovery(logFileName, - pJob.getJobConf()); - } - } catch (Throwable t) { - LOG.warn("Failed to delete log file (" + logFileName + ") for job " - + id + ". Continuing.", t); - } - - if (pJob.isComplete()) { - idIter.remove(); // no need to keep this job info as its successful + }); + recovered++; + } catch (Exception e) { + LOG.warn("Could not recover job " + jobId, e); } } - - recoveryDuration = clock.getTime() - recoveryStartTime; + recoveryDuration = clock.getTime() - recoveryProcessStartTime; hasRecovered = true; - // III. Finalize the recovery - synchronized (trackerExpiryQueue) { - // Make sure that the tracker statuses in the expiry-tracker queue - // are updated - long now = clock.getTime(); - int size = trackerExpiryQueue.size(); - for (int i = 0; i < size ; ++i) { - // Get the first tasktracker - TaskTrackerStatus taskTracker = trackerExpiryQueue.first(); - - // Remove it - trackerExpiryQueue.remove(taskTracker); - - // Set the new time - taskTracker.setLastSeen(now); - - // Add back to get the sorted list - trackerExpiryQueue.add(taskTracker); - } - } - - LOG.info("Restoration complete"); + LOG.info("Recovery done! Recoverd " + recovered + " of " + + jobsToRecover.size() + " jobs."); + LOG.info("Recovery Duration (ms):" + recoveryDuration); } int totalEventsRecovered() { @@ -3917,7 +3597,7 @@ public class JobTracker implements MRCon public synchronized JobID getNewJobId() throws IOException { return new JobID(getTrackerIdentifier(), nextJobId++); } - + /** * JobTracker.submitJob() kicks off a new job. * @@ -3928,8 +3608,24 @@ public class JobTracker implements MRCon */ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException { + return submitJob(jobId, jobSubmitDir, null, ts, false); + } + + /** + * JobTracker.submitJob() kicks off a new job. + * + * Create a 'JobInProgress' object, which contains both JobProfile and + * JobStatus. Those two sub-objects are sometimes shipped outside of the + * JobTracker. But JobInProgress adds info that's useful for the JobTracker + * alone. + */ + public JobStatus submitJob(JobID jobId, String jobSubmitDir, + UserGroupInformation ugi, Credentials ts, boolean recovered) + throws IOException { JobInfo jobInfo = null; - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + if (ugi == null) { + ugi = UserGroupInformation.getCurrentUser(); + } synchronized (this) { if (jobs.containsKey(jobId)) { // job already running, don't start twice @@ -3970,10 +3666,7 @@ public class JobTracker implements MRCon } catch (IOException ioe) { throw ioe; } - boolean recovered = true; // TODO: Once the Job recovery code is there, - // (MAPREDUCE-873) we - // must pass the "recovered" flag accurately. - // This is handled in the trunk/0.22 + if (!recovered) { // Store the information in a file so that the job can be recovered // later (if at all) @@ -4002,7 +3695,6 @@ public class JobTracker implements MRCon failJob(job); throw ioe; } - return status; } } @@ -4656,6 +4348,11 @@ public class JobTracker implements MRCon return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE); } + //Get the job token file in system directory + Path getTokenFileForJob(JobID id) { + return new Path(getSystemDirectoryForJob(id)+"/" + JOB_TOKEN_FILE); + } + /** * Change the run-time priority of the given job. * Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=1390006&r1=1390005&r2=1390006&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Tue Sep 25 17:23:37 2012 @@ -30,9 +30,7 @@ import org.junit.*; * This test checks if the jobtracker can detect and recover a tracker that was * lost while the jobtracker was down. */ -/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests - */ -@Ignore + public class TestJobTrackerRestartWithLostTracker extends TestCase { final Path testDir = new Path("/jt-restart-lost-tt-testing"); final Path inDir = new Path(testDir, "input"); @@ -53,11 +51,14 @@ public class TestJobTrackerRestartWithLo throws IOException { FileSystem fileSys = dfs.getFileSystem(); JobConf jobConf = mr.createJobConf(); - int numMaps = 50; + int numMaps = 2; int numReds = 1; String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir); String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir); - + + // Enable recovery on restart + mr.getJobTrackerConf() + .setBoolean("mapred.jobtracker.restart.recover", true); // Configure the jobs JobConf job = configureJob(jobConf, numMaps, numReds, mapSignalFile, redSignalFile); @@ -84,10 +85,6 @@ public class TestJobTrackerRestartWithLo // Signal the maps to complete UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile); - // Enable recovery on restart - mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", - true); - // Kill the 2nd tasktracker mr.stopTaskTracker(1); @@ -102,6 +99,8 @@ public class TestJobTrackerRestartWithLo // Wait for the JT to be ready UtilsForTests.waitForJobTracker(jobClient); + // Signal the maps to complete + UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile); // Signal the reducers to complete UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, redSignalFile); @@ -113,9 +112,7 @@ public class TestJobTrackerRestartWithLo + "upon restart", jobClient.getClusterStatus().getTaskTrackers(), 1); - // validate the history file - TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true); - TestJobHistory.validateJobHistoryFileContent(mr, rJob, job); + assertTrue("Job should be successful", rJob.isSuccessful()); } public void testRestartWithLostTracker() throws IOException { Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1390006&r1=1390005&r2=1390006&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Tue Sep 25 17:23:37 2012 @@ -34,9 +34,7 @@ import org.junit.*; * restart doesnt schedule any new tasks and waits for the (old) trackers to * join back. */ -/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests - */ -@Ignore + public class TestJobTrackerSafeMode extends TestCase { final Path testDir = new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode"); @@ -153,6 +151,7 @@ public class TestJobTrackerSafeMode exte mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown(); mr.stopTaskTracker(trackerToKill); + LOG.info("Starting the jobtracker..."); // Restart the jobtracker mr.startJobTracker(); @@ -169,8 +168,6 @@ public class TestJobTrackerSafeMode exte LOG.info("Start a new tracker"); mr.startTaskTracker(null, null, ++numTracker, numDir); - // Check if the jobs are still running - // Wait for the tracker to be lost boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule(); while (!checkTrackers(jobtracker, trackers, lostTrackers)) { @@ -181,20 +178,55 @@ public class TestJobTrackerSafeMode exte // snapshot jobtracker's scheduling status shouldSchedule = jobtracker.recoveryManager.shouldSchedule(); } - - assertTrue("JobTracker hasnt opened up scheduling even all the" - + " trackers were recovered", - jobtracker.recoveryManager.shouldSchedule()); - - assertEquals("Recovery manager is in inconsistent state", - 0, jobtracker.recoveryManager.recoveredTrackers.size()); + assertTrue("JobTracker has not opened up scheduling after all the" + + " trackers were recovered", shouldSchedule); + + assertEquals("Recovery manager is in inconsistent state", 0, + jobtracker.recoveryManager.recoveredTrackers.size()); + + // Signal the maps to complete + UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile); + + // Signal the reducers to complete + UtilsForTests + .signalTasks(dfs, fileSys, false, mapSignalFile, redSignalFile); // wait for the job to be complete UtilsForTests.waitTillDone(jobClient); } private boolean checkTrackers(JobTracker jobtracker, Set present, Set absent) { + while (jobtracker.getClusterStatus(true).getActiveTrackerNames().size() != 2) { + LOG.info("Waiting for Initialize all Task Trackers"); + UtilsForTests.waitFor(1000); + } + // Checking if the task tracker been initiated again + boolean found = false; + String strNewTrackerName = (String) (present.toArray()[0]); + LOG.info("Number of Trackers: " + + jobtracker.getClusterStatus(true).getActiveTrackerNames().size()); + for (String trackername : jobtracker.getClusterStatus(true) + .getActiveTrackerNames()) { + if (trackername.equalsIgnoreCase((String) (present.toArray()[0]))) { + found = true; + } else { + String[] trackerhostnames = trackername.split(":"); + CharSequence cseq = new String(trackerhostnames[0]); + if (((String) (present.toArray()[0])).contains(cseq)) { + strNewTrackerName = trackername; + found = false; + break; + } + } + } + if (!found) { + present.remove(((String) (present.toArray()[0]))); + LOG.info("Old tracker on this machine got reinited, " + + "Tracker added with new port " + strNewTrackerName); + present.add(strNewTrackerName); + } + long jobtrackerRecoveryFinishTime = jobtracker.getStartTime() + jobtracker.getRecoveryDuration(); for (String trackerName : present) { Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1390006&r1=1390005&r2=1390006&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Sep 25 17:23:37 2012 @@ -41,15 +41,37 @@ import org.junit.*; * failures and the jobtracker is able to tolerate {@link RecoveryManager} * failure. */ -/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests - */ -@Ignore + public class TestRecoveryManager extends TestCase { private static final Log LOG = LogFactory.getLog(TestRecoveryManager.class); private static final Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "test-recovery-manager"); + private FileSystem fs; + private JobConf conf; + private MiniMRCluster mr; + + protected void setUp() { + JobConf conf = new JobConf(); + try { + fs = FileSystem.get(new Configuration()); + fs.delete(TEST_DIR, true); + conf.set("mapred.jobtracker.job.history.block.size", "1024"); + conf.set("mapred.jobtracker.job.history.buffer.size", "1024"); + mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); + } catch (IOException e) { + e.printStackTrace(); + } + } + + protected void tearDown() { + ClusterStatus status = mr.getJobTrackerRunner().getJobTracker() + .getClusterStatus(false); + if (status.getJobTrackerState() == JobTracker.State.RUNNING) { + mr.shutdown(); + } + } /** * Tests the {@link JobTracker} against the exceptions thrown in @@ -60,21 +82,12 @@ public class TestRecoveryManager extends * - restarts the jobtracker * - checks if the jobtraker starts normally */ - public void testJobTracker() throws Exception { + public void testJobTrackerRestartsWithMissingJobFile() throws Exception { LOG.info("Testing jobtracker restart with faulty job"); String signalFile = new Path(TEST_DIR, "signal").toString(); - JobConf conf = new JobConf(); - - FileSystem fs = FileSystem.get(new Configuration()); - fs.delete(TEST_DIR, true); // cleanup - - conf.set("mapred.jobtracker.job.history.block.size", "1024"); - conf.set("mapred.jobtracker.job.history.buffer.size", "1024"); - - MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); - + JobConf job1 = mr.createJobConf(); - + UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, "test-recovery-manager", signalFile, signalFile); @@ -131,11 +144,63 @@ public class TestRecoveryManager extends // check if the jobtracker came up or not assertEquals("JobTracker crashed!", JobTracker.State.RUNNING, status.getJobTrackerState()); - - mr.shutdown(); } /** + * Tests the re-submission of the job in case of jobtracker died/restart + * - submits a job and let it be inited. + * - kills the jobtracker + * - checks if the jobtraker starts normally and job is recovered while + */ + + public void testJobResubmission() throws Exception { + LOG.info("Testing Job Resubmission"); + String signalFile = new Path(TEST_DIR, "signal").toString(); + + // make sure that the jobtracker is in recovery mode + mr.getJobTrackerConf() + .setBoolean("mapred.jobtracker.restart.recover", true); + + JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); + + JobConf job1 = mr.createJobConf(); + UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), + new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, + signalFile); + + JobClient jc = new JobClient(job1); + RunningJob rJob1 = jc.submitJob(job1); + LOG.info("Submitted first job " + rJob1.getID()); + + while (rJob1.mapProgress() < 0.5f) { + LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done"); + UtilsForTests.waitFor(100); + } + + // kill the jobtracker + LOG.info("Stopping jobtracker"); + mr.stopJobTracker(); + + // start the jobtracker + LOG.info("Starting jobtracker"); + mr.startJobTracker(); + UtilsForTests.waitForJobTracker(jc); + + jobtracker = mr.getJobTrackerRunner().getJobTracker(); + + // assert that job is recovered by the jobtracker + assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length); + JobInProgress jip = jobtracker.getJob(rJob1.getID()); + while (!jip.isComplete()) { + LOG.info("Waiting for job " + rJob1.getID() + " to be successful"); + // Signaling Map task to complete + fs.create(new Path(TEST_DIR, "signal")); + UtilsForTests.waitFor(100); + } + assertTrue("Task should be successful", rJob1.isSuccessful()); + } + + /** * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown * during recovery. It does the following : * - submits a job with HIGH priority and x tasks @@ -147,19 +212,13 @@ public class TestRecoveryManager extends * - checks if the jobtraker starts normally and job#2 is recovered while * job#1 is failed. */ - public void testRecoveryManager() throws Exception { + public void testJobTrackerRestartWithBadJobs() throws Exception { LOG.info("Testing recovery-manager"); String signalFile = new Path(TEST_DIR, "signal").toString(); + // make sure that the jobtracker is in recovery mode + mr.getJobTrackerConf() + .setBoolean("mapred.jobtracker.restart.recover", true); - // clean up - FileSystem fs = FileSystem.get(new Configuration()); - fs.delete(TEST_DIR, true); - - JobConf conf = new JobConf(); - conf.set("mapred.jobtracker.job.history.block.size", "1024"); - conf.set("mapred.jobtracker.job.history.buffer.size", "1024"); - - MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); JobConf job1 = mr.createJobConf(); @@ -179,7 +238,7 @@ public class TestRecoveryManager extends LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done"); UtilsForTests.waitFor(100); } - + // now submit job2 JobConf job2 = mr.createJobConf(); @@ -224,7 +283,7 @@ public class TestRecoveryManager extends LOG.info("Waiting for job " + jip.getJobID() + " to be inited"); UtilsForTests.waitFor(100); } - + // kill the jobtracker LOG.info("Stopping jobtracker"); mr.stopJobTracker(); @@ -247,21 +306,18 @@ public class TestRecoveryManager extends jobtracker = mr.getJobTrackerRunner().getJobTracker(); // assert that job2 is recovered by the jobtracker as job1 would fail - assertEquals("Recovery manager failed to tolerate job failures", - 2, jobtracker.getAllJobs().length); + assertEquals("Recovery manager failed to tolerate job failures", 1, + jobtracker.getAllJobs().length); // check if the job#1 has failed JobStatus status = jobtracker.getJobStatus(rJob1.getID()); - assertEquals("Faulty job not failed", - JobStatus.FAILED, status.getRunState()); + assertNull("Faulty job should not be resubmitted", status); jip = jobtracker.getJob(rJob2.getID()); assertFalse("Job should be running", jip.isComplete()); status = jobtracker.getJobStatus(rJob3.getID()); - assertNull("Job should be missing", status); - - mr.shutdown(); + assertNull("Job should be missing because of ACL changed", status); } /** @@ -278,113 +334,76 @@ public class TestRecoveryManager extends * jobtracker should crash. */ public void testRestartCount() throws Exception { - LOG.info("Testing restart-count"); + LOG.info("Testing Job Restart Count"); String signalFile = new Path(TEST_DIR, "signal").toString(); - - // clean up - FileSystem fs = FileSystem.get(new Configuration()); - fs.delete(TEST_DIR, true); - - JobConf conf = new JobConf(); - conf.set("mapred.jobtracker.job.history.block.size", "1024"); - conf.set("mapred.jobtracker.job.history.buffer.size", "1024"); - conf.setBoolean("mapred.jobtracker.restart.recover", true); - // since there is no need for initing - conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class, - TaskScheduler.class); - - MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); - JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); - JobClient jc = new JobClient(mr.createJobConf()); + // make sure that the jobtracker is in recovery mode + mr.getJobTrackerConf() + .setBoolean("mapred.jobtracker.restart.recover", true); - // check if the jobtracker info file exists - Path infoFile = jobtracker.recoveryManager.getRestartCountFile(); - assertTrue("Jobtracker infomation is missing", fs.exists(infoFile)); + JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); - // check if garbling the system files disables the recovery process - LOG.info("Stopping jobtracker for testing with system files deleted"); - mr.stopJobTracker(); - - // delete the info file - Path rFile = jobtracker.recoveryManager.getRestartCountFile(); - fs.delete(rFile,false); - - // start the jobtracker - LOG.info("Starting jobtracker with system files deleted"); - mr.startJobTracker(); - - UtilsForTests.waitForJobTracker(jc); - jobtracker = mr.getJobTrackerRunner().getJobTracker(); + JobConf job1 = mr.createJobConf(); + // set the high priority + job1.setJobPriority(JobPriority.HIGH); - // check if the recovey is disabled - assertFalse("Recovery is not disabled upon missing system files", - jobtracker.recoveryManager.shouldRecover()); - - // check if the system dir is sane - assertTrue("Recovery file is missing upon restart", fs.exists(rFile)); - Path tFile = jobtracker.recoveryManager.getTempRestartCountFile(); - assertFalse("Temp recovery file exists upon restart", fs.exists(tFile)); + UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), + new Path(TEST_DIR, "output3"), 30, 0, "test-restart", signalFile, + signalFile); - // submit a job - JobConf job = mr.createJobConf(); - - UtilsForTests.configureWaitingJobConf(job, - new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, - "test-recovery-manager", signalFile, signalFile); - // submit the faulty job - RunningJob rJob = jc.submitJob(job); - LOG.info("Submitted first job " + rJob.getID()); + JobClient jc = new JobClient(job1); + RunningJob rJob1 = jc.submitJob(job1); + LOG.info("Submitted first job " + rJob1.getID()); + + JobInProgress jip = jobtracker.getJob(rJob1.getID()); - // wait for 1 min - UtilsForTests.waitFor(60000); + while (!jip.inited()) { + LOG.info("Waiting for job " + jip.getJobID() + " to be inited"); + UtilsForTests.waitFor(100); + } - // kill the jobtracker multiple times and check if the count is correct - for (int i = 1; i <= 5; ++i) { + for (int i = 1; i <= 2; ++i) { LOG.info("Stopping jobtracker for " + i + " time"); mr.stopJobTracker(); - + // start the jobtracker LOG.info("Starting jobtracker for " + i + " time"); mr.startJobTracker(); - + UtilsForTests.waitForJobTracker(jc); - - // check if the system dir is sane - assertTrue("Recovery file is missing upon restart", fs.exists(rFile)); - assertFalse("Temp recovery file exists upon restart", fs.exists(tFile)); - + jobtracker = mr.getJobTrackerRunner().getJobTracker(); - JobInProgress jip = jobtracker.getJob(rJob.getID()); - + // assert if restart count is correct - assertEquals("Recovery manager failed to recover restart count", - i, jip.getNumRestarts()); + // It should always be 0 now as its resubmit everytime then restart. + assertEquals("Recovery manager failed to recover restart count", 0, jip + .getNumRestarts()); } - + // kill the old job - rJob.killJob(); + rJob1.killJob(); // II. Submit a new job and check if the restart count is 0 - JobConf job1 = mr.createJobConf(); - - UtilsForTests.configureWaitingJobConf(job1, - new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, - "test-recovery-manager", signalFile, signalFile); + JobConf job2 = mr.createJobConf(); + + UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), + new Path(TEST_DIR, "output7"), 50, 0, "test-restart-manager", + signalFile, signalFile); // submit a new job - rJob = jc.submitJob(job1); - LOG.info("Submitted first job after restart" + rJob.getID()); + RunningJob rJob2 = jc.submitJob(job2); + LOG.info("Submitted first job after restart" + rJob2.getID()); // assert if restart count is correct - JobInProgress jip = jobtracker.getJob(rJob.getID()); - assertEquals("Restart count for new job is incorrect", - 0, jip.getNumRestarts()); + jip = jobtracker.getJob(rJob2.getID()); + assertEquals("Restart count for new job is incorrect", 0, jip + .getNumRestarts()); LOG.info("Stopping jobtracker for testing the fs errors"); mr.stopJobTracker(); // check if system.dir problems in recovery kills the jobtracker + Path rFile = jobtracker.recoveryManager.getRestartCountFile(); fs.delete(rFile, false); FSDataOutputStream out = fs.create(rFile); out.writeBoolean(true); @@ -396,8 +415,7 @@ public class TestRecoveryManager extends JobTrackerRunner runner = mr.getJobTrackerRunner(); assertFalse("JobTracker is still alive", runner.isActive()); - mr.shutdown(); - } + } /** * Test if the jobtracker waits for the info file to be created before