Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 90415 invoked from network); 4 Mar 2011 03:27:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:27:13 -0000 Received: (qmail 878 invoked by uid 500); 4 Mar 2011 03:27:12 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 842 invoked by uid 500); 4 Mar 2011 03:27:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 829 invoked by uid 99); 4 Mar 2011 03:27:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:27:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:27:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 44D4C2388901; Fri, 4 Mar 2011 03:26:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1076969 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ Date: Fri, 04 Mar 2011 03:26:50 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304032650.44D4C2388901@eris.apache.org> Author: omalley Date: Fri Mar 4 03:26:49 2011 New Revision: 1076969 URL: http://svn.apache.org/viewvc?rev=1076969&view=rev Log: commit 74ce0696bde3778d2579e0bbeeed1012a9a4bf1b Author: Yahoo\! Date: Tue Aug 11 10:29:11 2009 -0700 Applying patch 2923008.mr814.patch Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076969&r1=1076968&r2=1076969&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 03:26:49 2011 @@ -26,6 +26,15 @@ + + mapred.job.tracker.history.completed.location + + The completed job history files are stored at this single well + known location. If nothing is specified, the files are stored at + ${hadoop.job.history.location}/done. + + + Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1076969&r1=1076968&r2=1076969&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar 4 03:26:49 2011 @@ -30,14 +30,19 @@ import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -101,6 +106,8 @@ public class JobHistory { FsPermission.createImmutable((short) 0750); // rwxr-x--- final static FsPermission HISTORY_FILE_PERMISSION = FsPermission.createImmutable((short) 0740); // rwxr----- + private static FileSystem LOGDIR_FS; // log dir filesystem + private static FileSystem DONEDIR_FS; // Done dir filesystem private static JobConf jtConf; private static Path DONE = null; // folder for completed jobs /** @@ -125,11 +132,23 @@ public class JobHistory { Path historyFilename; // path of job history file Path confFilename; // path of job's conf } - + + private ThreadPoolExecutor executor = null; + private final Configuration conf; + // cache from job-key to files associated with it. private Map fileCache = new ConcurrentHashMap(); + JobHistoryFilesManager(Configuration conf) throws IOException { + this.conf = conf; + } + + void start() { + executor = new ThreadPoolExecutor(1, 3, 1, + TimeUnit.HOURS, new LinkedBlockingQueue()); + } + private FilesHolder getFileHolder(JobID id) { FilesHolder holder = fileCache.get(id); if (holder == null) { @@ -172,6 +191,33 @@ public class JobHistory { void purgeJob(JobID id) { fileCache.remove(id); } + + void moveToDone(final JobID id, final List paths) { + executor.execute(new Runnable() { + + public void run() { + //move the files to DONE folder + try { + for (Path path : paths) { + //check if path exists, in case of retries it may not exist + if (LOGDIR_FS.exists(path)) { + LOG.info("Moving " + path.toString() + " to " + + DONE.toString()); + DONEDIR_FS.moveFromLocalFile(path, DONE); + DONEDIR_FS.setPermission(new Path(DONE, path.getName()), + new FsPermission(HISTORY_FILE_PERMISSION)); + } + } + + //purge the job from the cache + fileManager.purgeJob(id); + } catch (Throwable e) { + LOG.error("Unable to move history file to DONE folder.", e); + } + } + + }); + } } /** * Record types are identifiers for each line of log in history files. @@ -222,14 +268,13 @@ public class JobHistory { "file:///" + new File( System.getProperty("hadoop.log.dir")).getAbsolutePath() + File.separator + "history"); - DONE = new Path(LOG_DIR, "done"); JOBTRACKER_UNIQUE_STRING = hostname + "_" + String.valueOf(jobTrackerStartTime) + "_"; jobtrackerHostname = hostname; Path logDir = new Path(LOG_DIR); - FileSystem fs = logDir.getFileSystem(conf); - if (!fs.exists(logDir)){ - if (!fs.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) { + LOGDIR_FS = logDir.getFileSystem(conf); + if (!LOGDIR_FS.exists(logDir)){ + if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) { throw new IOException("Mkdirs failed to create " + logDir.toString()); } } @@ -241,11 +286,8 @@ public class JobHistory { 3 * 1024 * 1024); jtConf = conf; - // create the done folder with appropriate permission - fs.mkdirs(DONE, HISTORY_DIR_PERMISSION); - // initialize the file manager - fileManager = new JobHistoryFilesManager(); + fileManager = new JobHistoryFilesManager(conf); } catch(IOException e) { LOG.error("Failed to initialize JobHistory log file", e); disableHistory = true; @@ -253,6 +295,38 @@ public class JobHistory { return !(disableHistory); } + static boolean initDone(JobConf conf, FileSystem fs){ + try { + //if completed job history location is set, use that + String doneLocation = conf. + get("mapred.job.tracker.history.completed.location"); + if (doneLocation != null) { + DONE = fs.makeQualified(new Path(doneLocation)); + DONEDIR_FS = fs; + } else { + DONE = new Path(LOG_DIR, "done"); + DONEDIR_FS = LOGDIR_FS; + } + + //If not already present create the done folder with appropriate + //permission + if (!DONEDIR_FS.exists(DONE)) { + LOG.info("Creating DONE folder at "+ DONE); + if (! DONEDIR_FS.mkdirs(DONE, + new FsPermission(HISTORY_DIR_PERMISSION))) { + throw new IOException("Mkdirs failed to create " + DONE.toString()); + } + } + + fileManager.start(); + } catch(IOException e) { + LOG.error("Failed to initialize JobHistory log file", e); + disableHistory = true; + } + return !(disableHistory); + } + + /** * Manages job-history's meta information such as version etc. * Helps in logging version information to the job-history and recover @@ -724,20 +798,26 @@ public class JobHistory { public static synchronized String getJobHistoryFileName(JobConf jobConf, JobID id) throws IOException { - return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR)); + return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS); + } + + static synchronized String getDoneJobHistoryFileName(JobConf jobConf, + JobID id) throws IOException { + if (DONE == null) { + return null; + } + return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS); } /** * @param dir The directory where to search. */ - static synchronized String getJobHistoryFileName(JobConf jobConf, - JobID id, - Path dir) + private static synchronized String getJobHistoryFileName(JobConf jobConf, + JobID id, Path dir, FileSystem fs) throws IOException { String user = getUserName(jobConf); String jobName = trimJobName(getJobName(jobConf)); - FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf); if (LOG_DIR == null) { return null; } @@ -805,9 +885,8 @@ public class JobHistory { throws IOException { Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName); if (logPath != null) { - FileSystem fs = logPath.getFileSystem(conf); LOG.info("Deleting job history file " + logPath.getName()); - fs.delete(logPath, false); + LOGDIR_FS.delete(logPath, false); } // do the same for the user file too logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, @@ -835,25 +914,24 @@ public class JobHistory { Path logFilePath) throws IOException { Path ret; - FileSystem fs = logFilePath.getFileSystem(conf); String logFileName = logFilePath.getName(); String tmpFilename = getSecondaryJobHistoryFile(logFileName); Path logDir = logFilePath.getParent(); Path tmpFilePath = new Path(logDir, tmpFilename); - if (fs.exists(logFilePath)) { + if (LOGDIR_FS.exists(logFilePath)) { LOG.info(logFileName + " exists!"); - if (fs.exists(tmpFilePath)) { + if (LOGDIR_FS.exists(tmpFilePath)) { LOG.info("Deleting " + tmpFilename + " and using " + logFileName + " for recovery."); - fs.delete(tmpFilePath, false); + LOGDIR_FS.delete(tmpFilePath, false); } ret = tmpFilePath; } else { LOG.info(logFileName + " doesnt exist! Using " + tmpFilename + " for recovery."); - if (fs.exists(tmpFilePath)) { + if (LOGDIR_FS.exists(tmpFilePath)) { LOG.info("Renaming " + tmpFilename + " to " + logFileName); - fs.rename(tmpFilePath, logFilePath); + LOGDIR_FS.rename(tmpFilePath, logFilePath); ret = tmpFilePath; } else { ret = logFilePath; @@ -863,7 +941,7 @@ public class JobHistory { // do the same for the user files too logFilePath = getJobHistoryLogLocationForUser(logFileName, conf); if (logFilePath != null) { - fs = logFilePath.getFileSystem(conf); + FileSystem fs = logFilePath.getFileSystem(conf); logDir = logFilePath.getParent(); tmpFilePath = new Path(logDir, tmpFilename); if (fs.exists(logFilePath)) { @@ -911,8 +989,7 @@ public class JobHistory { // rename the tmp file to the master file. Note that this should be // done only when the file is closed and handles are released. LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName); - FileSystem fs = tmpLogPath.getFileSystem(jtConf); - fs.rename(tmpLogPath, masterLogPath); + LOGDIR_FS.rename(tmpLogPath, masterLogPath); // update the cache fileManager.setHistoryFile(id, masterLogPath); @@ -924,7 +1001,7 @@ public class JobHistory { JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, conf); if (masterLogPath != null) { - fs = masterLogPath.getFileSystem(conf); + FileSystem fs = masterLogPath.getFileSystem(conf); if (fs.exists(tmpLogPath)) { LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName + " in user directory"); @@ -966,29 +1043,27 @@ public class JobHistory { * This *should* be the last call to jobhistory for a given job. */ static void markCompleted(JobID id) throws IOException { + List paths = new ArrayList(); Path path = fileManager.getHistoryFile(id); if (path == null) { LOG.info("No file for job-history with " + id + " found in cache!"); return; + } else { + paths.add(path); } - Path newPath = new Path(DONE, path.getName()); - LOG.info("Moving completed job from " + path + " to " + newPath); - FileSystem fs = path.getFileSystem(jtConf); - fs.rename(path, newPath); Path confPath = fileManager.getConfFileWriters(id); if (confPath == null) { LOG.info("No file for jobconf with " + id + " found in cache!"); return; + } else { + paths.add(confPath); } - // move the conf too - newPath = new Path(DONE, confPath.getName()); - LOG.info("Moving configuration of completed job from " + confPath - + " to " + newPath); - fs.rename(confPath, newPath); - // purge the job from the cache - fileManager.purgeJob(id); + //move the job files to done folder and purge the job + if (paths.size() > 0) { + fileManager.moveToDone(id, paths); + } } /** @@ -1056,20 +1131,18 @@ public class JobHistory { if (LOG_DIR != null) { // create output stream for logging in hadoop.job.history.location - fs = new Path(LOG_DIR).getFileSystem(jobConf); - if (restarted) { logFile = recoverJobHistoryFile(jobConf, logFile); logFileName = logFile.getName(); } int defaultBufferSize = - fs.getConf().getInt("io.file.buffer.size", 4096); - out = fs.create(logFile, + LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096); + out = LOGDIR_FS.create(logFile, new FsPermission(HISTORY_FILE_PERMISSION), true, defaultBufferSize, - fs.getDefaultReplication(), + LOGDIR_FS.getDefaultReplication(), jobHistoryBlockSize, null); writer = new PrintWriter(out); fileManager.addWriter(jobId, writer); @@ -1147,16 +1220,15 @@ public class JobHistory { FSDataOutputStream jobFileOut = null; try { if (LOG_DIR != null) { - fs = new Path(LOG_DIR).getFileSystem(jobConf); int defaultBufferSize = - fs.getConf().getInt("io.file.buffer.size", 4096); - if (!fs.exists(jobFilePath)) { - jobFileOut = fs.create(jobFilePath, + LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096); + if (!LOGDIR_FS.exists(jobFilePath)) { + jobFileOut = LOGDIR_FS.create(jobFilePath, new FsPermission(HISTORY_FILE_PERMISSION), true, defaultBufferSize, - fs.getDefaultReplication(), - fs.getDefaultBlockSize(), null); + LOGDIR_FS.getDefaultReplication(), + LOGDIR_FS.getDefaultBlockSize(), null); jobConf.writeXml(jobFileOut); jobFileOut.close(); } @@ -1943,13 +2015,12 @@ public class JobHistory { lastRan = now; isRunning = true; try { - FileSystem fs = DONE.getFileSystem(jtConf); - FileStatus[] historyFiles = fs.listStatus(DONE); + FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE); // delete if older than 30 days if (historyFiles != null) { for (FileStatus f : historyFiles) { if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) { - fs.delete(f.getPath(), true); + DONEDIR_FS.delete(f.getPath(), true); LOG.info("Deleting old history file : " + f.getPath()); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076969&r1=1076968&r2=1076969&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:26:49 2011 @@ -1866,14 +1866,7 @@ public class JobTracker implements MRCon // initialize history parameters. boolean historyInitialized = JobHistory.init(conf, this.localMachine, this.startTime); - String historyLogDir = null; - FileSystem historyFS = null; - if (historyInitialized) { - historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString(); - infoServer.setAttribute("historyLogDir", historyLogDir); - historyFS = new Path(historyLogDir).getFileSystem(conf); - infoServer.setAttribute("fileSys", historyFS); - } + infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); infoServer.start(); @@ -1970,13 +1963,13 @@ public class JobTracker implements MRCon jobConf.deleteLocalFiles(SUBDIR); } - // Initialize history again if it is not initialized - // because history was on dfs and namenode was in safemode. - if (!historyInitialized) { - JobHistory.init(conf, this.localMachine, this.startTime); - historyLogDir = conf.get("hadoop.job.history.location"); + // Initialize history DONE folder + if (historyInitialized) { + JobHistory.initDone(conf, fs); + String historyLogDir = + JobHistory.getCompletedJobHistoryLocation().toString(); infoServer.setAttribute("historyLogDir", historyLogDir); - historyFS = new Path(historyLogDir).getFileSystem(conf); + FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf); infoServer.setAttribute("fileSys", historyFS); } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1076969&r1=1076968&r2=1076969&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar 4 03:26:49 2011 @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobHistory.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -464,8 +465,7 @@ public class TestJobHistory extends Test // Get the history file name Path dir = JobHistory.getCompletedJobHistoryLocation(); - String logFileName = - JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir); + String logFileName = getDoneFile(conf, id, dir); // Framework history log file location Path logFile = new Path(dir, logFileName); @@ -763,8 +763,7 @@ public class TestJobHistory extends Test JobID id = job.getID(); Path doneDir = JobHistory.getCompletedJobHistoryLocation(); // Get the history file name - String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, - doneDir); + String logFileName = getDoneFile(conf, id, doneDir); // Framework history log file location Path logFile = new Path(doneDir, logFileName); @@ -792,6 +791,92 @@ public class TestJobHistory extends Test validateTaskAttemptLevelKeyValues(mr, job, jobInfo); } + public void testDoneFolderOnHDFS() throws IOException { + MiniMRCluster mr = null; + try { + JobConf conf = new JobConf(); + // keep for less time + conf.setLong("mapred.jobtracker.retirejob.check", 1000); + conf.setLong("mapred.jobtracker.retirejob.interval", 1000); + + //set the done folder location + String doneFolder = "history_done"; + conf.set("mapred.job.tracker.history.completed.location", doneFolder); + + MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null); + mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), + 3, null, null, conf); + + // run the TCs + conf = mr.createJobConf(); + + FileSystem fs = FileSystem.get(conf); + // clean up + fs.delete(new Path("succeed"), true); + + Path inDir = new Path("succeed/input"); + Path outDir = new Path("succeed/output"); + + //Disable speculative execution + conf.setSpeculativeExecution(false); + + // Make sure that the job is not removed from memory until we do finish + // the validation of history file content + conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10); + + // Run a job that will be succeeded and validate its history file + RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); + + Path doneDir = JobHistory.getCompletedJobHistoryLocation(); + assertEquals("History DONE folder not correct", + doneFolder, doneDir.getName()); + JobID id = job.getID(); + String logFileName = getDoneFile(conf, id, doneDir); + + // Framework history log file location + Path logFile = new Path(doneDir, logFileName); + FileSystem fileSys = logFile.getFileSystem(conf); + + // Check if the history file exists + assertTrue("History file does not exist", fileSys.exists(logFile)); + + // check if the corresponding conf file exists + Path confFile = getPathForConf(logFile, doneDir); + assertTrue("Config for completed jobs doesnt exist", + fileSys.exists(confFile)); + + // check if the file exists in a done folder + assertTrue("Completed job config doesnt exist in the done folder", + doneDir.getName().equals(confFile.getParent().getName())); + + // check if the file exists in a done folder + assertTrue("Completed jobs doesnt exist in the done folder", + doneDir.getName().equals(logFile.getParent().getName())); + + + // check if the job file is removed from the history location + Path runningJobsHistoryFolder = logFile.getParent().getParent(); + Path runningJobHistoryFilename = + new Path(runningJobsHistoryFolder, logFile.getName()); + Path runningJobConfFilename = + new Path(runningJobsHistoryFolder, confFile.getName()); + assertFalse("History file not deleted from the running folder", + fileSys.exists(runningJobHistoryFilename)); + assertFalse("Config for completed jobs not deleted from running folder", + fileSys.exists(runningJobConfFilename)); + + validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false); + validateJobHistoryFileContent(mr, job, conf); + + // get the job conf filename + } finally { + if (mr != null) { + cleanupLocalFiles(mr); + mr.shutdown(); + } + } + } + /** Run a job that will be succeeded and validate its history file format * and its content. */ @@ -802,6 +887,11 @@ public class TestJobHistory extends Test // keep for less time conf.setLong("mapred.jobtracker.retirejob.check", 1000); conf.setLong("mapred.jobtracker.retirejob.interval", 1000); + + //set the done folder location + String doneFolder = TEST_ROOT_DIR + "history_done"; + conf.set("mapred.job.tracker.history.completed.location", doneFolder); + mr = new MiniMRCluster(2, "file:///", 3, null, null, conf); // run the TCs @@ -825,9 +915,11 @@ public class TestJobHistory extends Test RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); Path doneDir = JobHistory.getCompletedJobHistoryLocation(); + assertEquals("History DONE folder not correct", + doneFolder, doneDir.toString()); JobID id = job.getID(); - String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, - doneDir); + String logFileName = getDoneFile(conf, id, doneDir); + // Framework history log file location Path logFile = new Path(doneDir, logFileName); FileSystem fileSys = logFile.getFileSystem(conf); @@ -842,11 +934,11 @@ public class TestJobHistory extends Test // check if the file exists in a done folder assertTrue("Completed job config doesnt exist in the done folder", - "done".equals(confFile.getParent().getName())); + doneDir.getName().equals(confFile.getParent().getName())); // check if the file exists in a done folder assertTrue("Completed jobs doesnt exist in the done folder", - "done".equals(logFile.getParent().getName())); + doneDir.getName().equals(logFile.getParent().getName())); // check if the job file is removed from the history location @@ -880,6 +972,17 @@ public class TestJobHistory extends Test } } + //Returns the file in the done folder + //Waits for sometime to get the file moved to done + private static String getDoneFile(JobConf conf, JobID id, + Path doneDir) throws IOException { + String name = null; + for (int i = 0; name == null && i < 20; i++) { + name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id); + UtilsForTests.waitFor(1000); + } + return name; + } // Returns the output path where user history log file is written to with // default configuration setting for hadoop.job.history.user.location private static Path getLogLocationInOutputPath(String logFileName, @@ -900,8 +1003,7 @@ public class TestJobHistory extends Test throws IOException { // Get the history file name Path doneDir = JobHistory.getCompletedJobHistoryLocation(); - String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, - doneDir); + String logFileName = getDoneFile(conf, id, doneDir); // User history log file location Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser( @@ -1012,8 +1114,7 @@ public class TestJobHistory extends Test // Get the history file name Path doneDir = JobHistory.getCompletedJobHistoryLocation(); - String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, - doneDir); + String logFileName = getDoneFile(conf, id, doneDir); // Framework history log file location Path logFile = new Path(doneDir, logFileName); Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1076969&r1=1076968&r2=1076969&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar 4 03:26:49 2011 @@ -320,48 +320,7 @@ public class TestJobTrackerRestart exten assertTrue("Cluster status is insane", checkClusterStatusOnCompletion(status, prevStatus)); } - - /** - * Checks if the history files are as expected - * @param id job id - * @param conf job conf - */ - private void testJobHistoryFiles(JobID id, JobConf conf) - throws IOException { - // Get the history files for users - Path dir = JobHistory.getCompletedJobHistoryLocation(); - String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, - dir); - String tempLogFileName = - JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName); - - // I. User files - Path logFile = - JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf); - FileSystem fileSys = logFile.getFileSystem(conf); - - // Check if the history file exists - assertTrue("User log file does not exist", fileSys.exists(logFile)); - - // Check if the temporary file is deleted - Path tempLogFile = - JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, - conf); - assertFalse("User temporary log file exists", fileSys.exists(tempLogFile)); - - // II. Framework files - // Get the history file - logFile = new Path(dir, logFileName); - fileSys = logFile.getFileSystem(conf); - - // Check if the history file exists - assertTrue("Log file does not exist", fileSys.exists(logFile)); - - // Check if the temporary file is deleted - tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName); - assertFalse("Temporary log file exists", fileSys.exists(tempLogFile)); - } - + /** * Matches specified number of task reports. * @param source the reports to be matched