Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 96900 invoked from network); 4 Mar 2011 04:25:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:25:21 -0000 Received: (qmail 98706 invoked by uid 500); 4 Mar 2011 04:25:21 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 98677 invoked by uid 500); 4 Mar 2011 04:25:21 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 98670 invoked by uid 99); 4 Mar 2011 04:25:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:25:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:25:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 29570238890D; Fri, 4 Mar 2011 04:24:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077531 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/JobHistory.java mapred/org/apache/hadoop/mapred/JobTracker.java test/org/apache/hadoop/mapred/TestJobHistoryConfig.java Date: Fri, 04 Mar 2011 04:24:58 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304042458.29570238890D@eris.apache.org> Author: omalley Date: Fri Mar 4 04:24:57 2011 New Revision: 1077531 URL: http://svn.apache.org/viewvc?rev=1077531&view=rev Log: commit f5a5744b89b73591fcdaec839ab1d7e7ea2a3ccf Author: Arun C Murthy Date: Tue Jul 6 10:43:43 2010 -0700 MAPREDUCE-1699. Ensure JobHistory isn't disabled for any reason. Contributed by Krishna Ramachandran. +++ b/YAHOO-CHANGES.txt + MAPREDUCE-1699. Ensure JobHistory isn't disabled for any reason. (Krishna + Ramachandran via acmurthy) + Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1077531&r1=1077530&r2=1077531&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar 4 04:24:57 2011 @@ -102,7 +102,6 @@ public class JobHistory { public static final int JOB_NAME_TRIM_LENGTH = 50; private static String JOBTRACKER_UNIQUE_STRING = null; private static String LOG_DIR = null; - private static boolean disableHistory = true; private static final String SECONDARY_FILE_SUFFIX = ".recover"; private static long jobHistoryBlockSize = 0; private static String jobtrackerHostname; @@ -227,9 +226,6 @@ public class JobHistory { } void moveToDone(final JobID id) { - if (disableHistory) { - return; - } final List paths = new ArrayList(); final Path historyFile = fileManager.getHistoryFile(id); if (historyFile == null) { @@ -279,6 +275,10 @@ public class JobHistory { }); } + + void removeWriter(JobID jobId, PrintWriter writer) { + fileManager.getWriters(jobId).remove(writer); + } } /** * Record types are identifiers for each line of log in history files. @@ -320,72 +320,58 @@ public class JobHistory { * @return true if intialized properly * false otherwise */ - public static boolean init(JobTracker jobTracker, JobConf conf, - String hostname, long jobTrackerStartTime){ - try { - LOG_DIR = conf.get("hadoop.job.history.location" , - "file:///" + new File( - System.getProperty("hadoop.log.dir")).getAbsolutePath() - + File.separator + "history"); - JOBTRACKER_UNIQUE_STRING = hostname + "_" + - String.valueOf(jobTrackerStartTime) + "_"; - jobtrackerHostname = hostname; - Path logDir = new Path(LOG_DIR); - LOGDIR_FS = logDir.getFileSystem(conf); - if (!LOGDIR_FS.exists(logDir)){ - if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) { - throw new IOException("Mkdirs failed to create " + logDir.toString()); - } - } - conf.set("hadoop.job.history.location", LOG_DIR); - disableHistory = false; - // set the job history block size (default is 3MB) - jobHistoryBlockSize = - conf.getLong("mapred.jobtracker.job.history.block.size", - 3 * 1024 * 1024); - jtConf = conf; - - // queue and job level security is enabled on the mapreduce cluster or not - aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false); - - // initialize the file manager - fileManager = new JobHistoryFilesManager(conf, jobTracker); - } catch(IOException e) { - LOG.error("Failed to initialize JobHistory log file", e); - disableHistory = true; - } - return !(disableHistory); - } - - static boolean initDone(JobConf conf, FileSystem fs){ - try { - //if completed job history location is set, use that - String doneLocation = conf. - get("mapred.job.tracker.history.completed.location"); - if (doneLocation != null) { - DONE = fs.makeQualified(new Path(doneLocation)); - DONEDIR_FS = fs; - } else { - DONE = new Path(LOG_DIR, "done"); - DONEDIR_FS = LOGDIR_FS; + public static void init(JobTracker jobTracker, JobConf conf, + String hostname, long jobTrackerStartTime) throws IOException { + LOG_DIR = conf.get("hadoop.job.history.location" , + "file:///" + new File( + System.getProperty("hadoop.log.dir")).getAbsolutePath() + + File.separator + "history"); + JOBTRACKER_UNIQUE_STRING = hostname + "_" + + String.valueOf(jobTrackerStartTime) + "_"; + jobtrackerHostname = hostname; + Path logDir = new Path(LOG_DIR); + LOGDIR_FS = logDir.getFileSystem(conf); + if (!LOGDIR_FS.exists(logDir)){ + if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) { + throw new IOException("Mkdirs failed to create " + logDir.toString()); + } + } + conf.set("hadoop.job.history.location", LOG_DIR); + // set the job history block size (default is 3MB) + jobHistoryBlockSize = + conf.getLong("mapred.jobtracker.job.history.block.size", + 3 * 1024 * 1024); + jtConf = conf; + + // queue and job level security is enabled on the mapreduce cluster or not + aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false); + + // initialize the file manager + fileManager = new JobHistoryFilesManager(conf, jobTracker); + } + + static void initDone(JobConf conf, FileSystem fs) throws IOException { + //if completed job history location is set, use that + String doneLocation = conf. + get("mapred.job.tracker.history.completed.location"); + if (doneLocation != null) { + DONE = fs.makeQualified(new Path(doneLocation)); + DONEDIR_FS = fs; + } else { + DONE = new Path(LOG_DIR, "done"); + DONEDIR_FS = LOGDIR_FS; + } + + //If not already present create the done folder with appropriate + //permission + if (!DONEDIR_FS.exists(DONE)) { + LOG.info("Creating DONE folder at "+ DONE); + if (! DONEDIR_FS.mkdirs(DONE, + new FsPermission(HISTORY_DIR_PERMISSION))) { + throw new IOException("Mkdirs failed to create " + DONE.toString()); } - - //If not already present create the done folder with appropriate - //permission - if (!DONEDIR_FS.exists(DONE)) { - LOG.info("Creating DONE folder at "+ DONE); - if (! DONEDIR_FS.mkdirs(DONE, - new FsPermission(HISTORY_DIR_PERMISSION))) { - throw new IOException("Mkdirs failed to create " + DONE.toString()); - } - } - - fileManager.start(); - } catch(IOException e) { - LOG.error("Failed to initialize JobHistory log file", e); - disableHistory = true; } - return !(disableHistory); + fileManager.start(); } @@ -436,12 +422,10 @@ public class JobHistory { * @param jobId job id, assigned by jobtracker. */ static void logMetaInfo(ArrayList writers){ - if (!disableHistory){ - if (null != writers){ - JobHistory.log(writers, RecordTypes.Meta, - new Keys[] {Keys.VERSION}, - new String[] {String.valueOf(VERSION)}); - } + if (null != writers){ + JobHistory.log(writers, RecordTypes.Meta, + new Keys[] {Keys.VERSION}, + new String[] {String.valueOf(VERSION)}); } } } @@ -557,8 +541,30 @@ public class JobHistory { * @param values type of log event */ + /** + * Log a number of keys and values with record. the array length of keys and values + * should be same. + * @param recordType type of log event + * @param keys type of log event + * @param values type of log event + */ + static void log(ArrayList writers, RecordTypes recordType, Keys[] keys, String[] values) { + log(writers, recordType, keys, values, null); + } + + /** + * Log a number of keys and values with record. the array length of keys and values + * should be same. + * @param recordType type of log event + * @param keys type of log event + * @param values type of log event + * @param JobID jobid of the job + */ + + static void log(ArrayList writers, RecordTypes recordType, + Keys[] keys, String[] values, JobID id) { // First up calculate the length of buffer, so that we are performant // enough. @@ -583,28 +589,14 @@ public class JobHistory { for (PrintWriter out : writers) { out.println(builder.toString()); + if (out.checkError() && id != null) { + LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager"); + fileManager.removeWriter(id, out); + } } } /** - * Returns history disable status. by default history is enabled so this - * method returns false. - * @return true if history logging is disabled, false otherwise. - */ - public static boolean isDisableHistory() { - return disableHistory; - } - - /** - * Enable/disable history logging. Default value is false, so history - * is enabled by default. - * @param disableHistory true if history should be disabled, false otherwise. - */ - public static void setDisableHistory(boolean disableHistory) { - JobHistory.disableHistory = disableHistory; - } - - /** * Get the history location */ static Path getJobHistoryLocation() { @@ -1185,104 +1177,101 @@ public class JobHistory { String userLogDir = null; String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId; - if (!disableHistory){ - // Get the username and job name to be used in the actual log filename; - // sanity check them too - String jobName = getJobName(jobConf); - - String user = getUserName(jobConf); - - // get the history filename - String logFileName = null; - if (restarted) { - logFileName = getJobHistoryFileName(jobConf, jobId); - if (logFileName == null) { - logFileName = - encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId)); - } else { - String parts[] = logFileName.split("_"); - //TODO this is a hack :( - // jobtracker-hostname_jobtracker-identifier_ - String jtUniqueString = parts[0] + "_" + parts[1] + "_"; - jobUniqueString = jtUniqueString + jobId.toString(); - } - } else { - logFileName = + // Get the username and job name to be used in the actual log filename; + // sanity check them too + String jobName = getJobName(jobConf); + String user = getUserName(jobConf); + + // get the history filename + String logFileName = null; + if (restarted) { + logFileName = getJobHistoryFileName(jobConf, jobId); + if (logFileName == null) { + logFileName = encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId)); + } else { + String parts[] = logFileName.split("_"); + //TODO this is a hack :( + // jobtracker-hostname_jobtracker-identifier_ + String jtUniqueString = parts[0] + "_" + parts[1] + "_"; + jobUniqueString = jtUniqueString + jobId.toString(); } + } else { + logFileName = + encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId)); + } - // setup the history log file for this job - Path logFile = getJobHistoryLogLocation(logFileName); - - // find user log directory - Path userLogFile = - getJobHistoryLogLocationForUser(logFileName, jobConf); - - try{ - FSDataOutputStream out = null; - PrintWriter writer = null; - - if (LOG_DIR != null) { - // create output stream for logging in hadoop.job.history.location - if (restarted) { - logFile = recoverJobHistoryFile(jobConf, logFile); - logFileName = logFile.getName(); - } - - int defaultBufferSize = - LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096); - out = LOGDIR_FS.create(logFile, - new FsPermission(HISTORY_FILE_PERMISSION), - true, - defaultBufferSize, - LOGDIR_FS.getDefaultReplication(), - jobHistoryBlockSize, null); - writer = new PrintWriter(out); - fileManager.addWriter(jobId, writer); - - // cache it ... - fileManager.setHistoryFile(jobId, logFile); - } - if (userLogFile != null) { - // Get the actual filename as recoverJobHistoryFile() might return - // a different filename - userLogDir = userLogFile.getParent().toString(); - userLogFile = new Path(userLogDir, logFileName); - - // create output stream for logging - // in hadoop.job.history.user.location - fs = userLogFile.getFileSystem(jobConf); - - out = fs.create(userLogFile, true, 4096); - writer = new PrintWriter(out); - fileManager.addWriter(jobId, writer); + // setup the history log file for this job + Path logFile = getJobHistoryLogLocation(logFileName); + + // find user log directory + Path userLogFile = + getJobHistoryLogLocationForUser(logFileName, jobConf); + PrintWriter writer = null; + try{ + FSDataOutputStream out = null; + if (LOG_DIR != null) { + // create output stream for logging in hadoop.job.history.location + if (restarted) { + logFile = recoverJobHistoryFile(jobConf, logFile); + logFileName = logFile.getName(); } - ArrayList writers = fileManager.getWriters(jobId); - // Log the history meta info - JobHistory.MetaInfoManager.logMetaInfo(writers); - - String viewJobACL = "*"; - String modifyJobACL = "*"; - if (aclsEnabled) { - viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " "); - modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " "); - } - //add to writer as well - JobHistory.log(writers, RecordTypes.Job, - new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, - Keys.SUBMIT_TIME, Keys.JOBCONF, - Keys.VIEW_JOB, Keys.MODIFY_JOB, - Keys.JOB_QUEUE}, - new String[]{jobId.toString(), jobName, user, - String.valueOf(submitTime) , jobConfPath, - viewJobACL, modifyJobACL, - jobConf.getQueueName()} - ); - - }catch(IOException e){ - LOG.error("Failed creating job history log file, disabling history", e); - disableHistory = true; + int defaultBufferSize = + LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096); + out = LOGDIR_FS.create(logFile, + new FsPermission(HISTORY_FILE_PERMISSION), + true, + defaultBufferSize, + LOGDIR_FS.getDefaultReplication(), + jobHistoryBlockSize, null); + writer = new PrintWriter(out); + fileManager.addWriter(jobId, writer); + + // cache it ... + fileManager.setHistoryFile(jobId, logFile); + } + if (userLogFile != null) { + // Get the actual filename as recoverJobHistoryFile() might return + // a different filename + userLogDir = userLogFile.getParent().toString(); + userLogFile = new Path(userLogDir, logFileName); + + // create output stream for logging + // in hadoop.job.history.user.location + fs = userLogFile.getFileSystem(jobConf); + + out = fs.create(userLogFile, true, 4096); + writer = new PrintWriter(out); + fileManager.addWriter(jobId, writer); + } + + ArrayList writers = fileManager.getWriters(jobId); + // Log the history meta info + JobHistory.MetaInfoManager.logMetaInfo(writers); + + String viewJobACL = "*"; + String modifyJobACL = "*"; + if (aclsEnabled) { + viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " "); + modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " "); + } + //add to writer as well + JobHistory.log(writers, RecordTypes.Job, + new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, + Keys.SUBMIT_TIME, Keys.JOBCONF, + Keys.VIEW_JOB, Keys.MODIFY_JOB, + Keys.JOB_QUEUE}, + new String[]{jobId.toString(), jobName, user, + String.valueOf(submitTime) , jobConfPath, + viewJobACL, modifyJobACL, + jobConf.getQueueName()}, jobId + ); + + }catch(IOException e){ + LOG.error("Failed creating job history log file for job " + jobId, e); + if (writer != null) { + fileManager.removeWriter(jobId, writer); } } // Always store job conf on local file system @@ -1369,18 +1358,16 @@ public class JobHistory { */ public static void logInited(JobID jobId, long startTime, int totalMaps, int totalReduces) { - if (!disableHistory){ - ArrayList writer = fileManager.getWriters(jobId); + ArrayList writer = fileManager.getWriters(jobId); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Job, - new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, - Keys.TOTAL_REDUCES, Keys.JOB_STATUS}, - new String[] {jobId.toString(), String.valueOf(startTime), - String.valueOf(totalMaps), - String.valueOf(totalReduces), - Values.PREP.name()}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Job, + new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, + Keys.TOTAL_REDUCES, Keys.JOB_STATUS}, + new String[] {jobId.toString(), String.valueOf(startTime), + String.valueOf(totalMaps), + String.valueOf(totalReduces), + Values.PREP.name()}, jobId); } } @@ -1405,15 +1392,13 @@ public class JobHistory { * @param jobId job id, assigned by jobtracker. */ public static void logStarted(JobID jobId){ - if (!disableHistory){ - ArrayList writer = fileManager.getWriters(jobId); + ArrayList writer = fileManager.getWriters(jobId); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Job, - new Keys[] {Keys.JOBID, Keys.JOB_STATUS}, - new String[] {jobId.toString(), - Values.RUNNING.name()}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Job, + new Keys[] {Keys.JOBID, Keys.JOB_STATUS}, + new String[] {jobId.toString(), + Values.RUNNING.name()}, jobId); } } @@ -1433,34 +1418,32 @@ public class JobHistory { Counters mapCounters, Counters reduceCounters, Counters counters) { - if (!disableHistory){ // close job file for this job - ArrayList writer = fileManager.getWriters(jobId); + ArrayList writer = fileManager.getWriters(jobId); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Job, - new Keys[] {Keys.JOBID, Keys.FINISH_TIME, - Keys.JOB_STATUS, Keys.FINISHED_MAPS, - Keys.FINISHED_REDUCES, - Keys.FAILED_MAPS, Keys.FAILED_REDUCES, - Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS, - Keys.COUNTERS}, - new String[] {jobId.toString(), Long.toString(finishTime), - Values.SUCCESS.name(), - String.valueOf(finishedMaps), - String.valueOf(finishedReduces), - String.valueOf(failedMaps), - String.valueOf(failedReduces), - mapCounters.makeEscapedCompactString(), - reduceCounters.makeEscapedCompactString(), - counters.makeEscapedCompactString()}); - for (PrintWriter out : writer) { - out.close(); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Job, + new Keys[] {Keys.JOBID, Keys.FINISH_TIME, + Keys.JOB_STATUS, Keys.FINISHED_MAPS, + Keys.FINISHED_REDUCES, + Keys.FAILED_MAPS, Keys.FAILED_REDUCES, + Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS, + Keys.COUNTERS}, + new String[] {jobId.toString(), Long.toString(finishTime), + Values.SUCCESS.name(), + String.valueOf(finishedMaps), + String.valueOf(finishedReduces), + String.valueOf(failedMaps), + String.valueOf(failedReduces), + mapCounters.makeEscapedCompactString(), + reduceCounters.makeEscapedCompactString(), + counters.makeEscapedCompactString()}, jobId); + for (PrintWriter out : writer) { + out.close(); } - Thread historyCleaner = new Thread(new HistoryCleaner()); - historyCleaner.start(); } + Thread historyCleaner = new Thread(new HistoryCleaner()); + historyCleaner.start(); } /** * Logs job failed event. Closes the job history log file. @@ -1470,17 +1453,15 @@ public class JobHistory { * @param finishedReduces no of finished reduce tasks. */ public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){ - if (!disableHistory){ - ArrayList writer = fileManager.getWriters(jobid); + ArrayList writer = fileManager.getWriters(jobid); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Job, - new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES }, - new String[] {jobid.toString(), String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), - String.valueOf(finishedReduces)}); - for (PrintWriter out : writer) { - out.close(); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Job, + new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES }, + new String[] {jobid.toString(), String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), + String.valueOf(finishedReduces)}, jobid); + for (PrintWriter out : writer) { + out.close(); } } } @@ -1498,18 +1479,16 @@ public class JobHistory { */ public static void logKilled(JobID jobid, long timestamp, int finishedMaps, int finishedReduces) { - if (!disableHistory) { - ArrayList writer = fileManager.getWriters(jobid); + ArrayList writer = fileManager.getWriters(jobid); - if (null != writer) { - JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID, - Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, - Keys.FINISHED_REDUCES }, new String[] { jobid.toString(), - String.valueOf(timestamp), Values.KILLED.name(), - String.valueOf(finishedMaps), String.valueOf(finishedReduces) }); - for (PrintWriter out : writer) { - out.close(); - } + if (null != writer) { + JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID, + Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, + Keys.FINISHED_REDUCES }, new String[] { jobid.toString(), + String.valueOf(timestamp), Values.KILLED.name(), + String.valueOf(finishedMaps), String.valueOf(finishedReduces) }, jobid); + for (PrintWriter out : writer) { + out.close(); } } } @@ -1519,14 +1498,12 @@ public class JobHistory { * @param priority Jobs priority */ public static void logJobPriority(JobID jobid, JobPriority priority){ - if (!disableHistory){ - ArrayList writer = fileManager.getWriters(jobid); + ArrayList writer = fileManager.getWriters(jobid); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Job, - new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY}, - new String[] {jobid.toString(), priority.toString()}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Job, + new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY}, + new String[] {jobid.toString(), priority.toString()}, jobid); } } /** @@ -1545,17 +1522,15 @@ public class JobHistory { public static void logJobInfo(JobID jobid, long submitTime, long launchTime) { - if (!disableHistory){ - ArrayList writer = fileManager.getWriters(jobid); + ArrayList writer = fileManager.getWriters(jobid); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Job, - new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, - Keys.LAUNCH_TIME}, - new String[] {jobid.toString(), - String.valueOf(submitTime), - String.valueOf(launchTime)}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Job, + new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, + Keys.LAUNCH_TIME}, + new String[] {jobid.toString(), + String.valueOf(submitTime), + String.valueOf(launchTime)}, jobid); } } } @@ -1576,18 +1551,16 @@ public class JobHistory { */ public static void logStarted(TaskID taskId, String taskType, long startTime, String splitLocations) { - if (!disableHistory){ - JobID id = taskId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Task, - new Keys[]{Keys.TASKID, Keys.TASK_TYPE , - Keys.START_TIME, Keys.SPLITS}, - new String[]{taskId.toString(), taskType, - String.valueOf(startTime), - splitLocations}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Task, + new Keys[]{Keys.TASKID, Keys.TASK_TYPE , + Keys.START_TIME, Keys.SPLITS}, + new String[]{taskId.toString(), taskType, + String.valueOf(startTime), + splitLocations}, id); } } /** @@ -1598,19 +1571,17 @@ public class JobHistory { */ public static void logFinished(TaskID taskId, String taskType, long finishTime, Counters counters){ - if (!disableHistory){ - JobID id = taskId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Task, - new Keys[]{Keys.TASKID, Keys.TASK_TYPE, - Keys.TASK_STATUS, Keys.FINISH_TIME, - Keys.COUNTERS}, - new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), - String.valueOf(finishTime), - counters.makeEscapedCompactString()}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Task, + new Keys[]{Keys.TASKID, Keys.TASK_TYPE, + Keys.TASK_STATUS, Keys.FINISH_TIME, + Keys.COUNTERS}, + new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), + String.valueOf(finishTime), + counters.makeEscapedCompactString()}, id); } } @@ -1620,16 +1591,14 @@ public class JobHistory { * @param finishTime finish time of task in ms */ public static void logUpdates(TaskID taskId, long finishTime){ - if (!disableHistory){ - JobID id = taskId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.Task, - new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, - new String[]{ taskId.toString(), - String.valueOf(finishTime)}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.Task, + new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, + new String[]{ taskId.toString(), + String.valueOf(finishTime)}, id); } } @@ -1650,23 +1619,21 @@ public class JobHistory { public static void logFailed(TaskID taskId, String taskType, long time, String error, TaskAttemptID failedDueToAttempt){ - if (!disableHistory){ - JobID id = taskId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - String failedAttempt = failedDueToAttempt == null - ? "" - : failedDueToAttempt.toString(); - JobHistory.log(writer, RecordTypes.Task, - new Keys[]{Keys.TASKID, Keys.TASK_TYPE, - Keys.TASK_STATUS, Keys.FINISH_TIME, - Keys.ERROR, Keys.TASK_ATTEMPT_ID}, - new String[]{ taskId.toString(), taskType, - Values.FAILED.name(), - String.valueOf(time) , error, - failedAttempt}); - } + if (null != writer){ + String failedAttempt = failedDueToAttempt == null + ? "" + : failedDueToAttempt.toString(); + JobHistory.log(writer, RecordTypes.Task, + new Keys[]{Keys.TASKID, Keys.TASK_TYPE, + Keys.TASK_STATUS, Keys.FINISH_TIME, + Keys.ERROR, Keys.TASK_ATTEMPT_ID}, + new String[]{ taskId.toString(), taskType, + Values.FAILED.name(), + String.valueOf(time) , error, + failedAttempt}, id); } } /** @@ -1712,22 +1679,20 @@ public class JobHistory { public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.MapAttempt, - new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.START_TIME, - Keys.TRACKER_NAME, Keys.HTTP_PORT}, - new String[]{taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - String.valueOf(startTime), trackerName, - httpPort == -1 ? "" : - String.valueOf(httpPort)}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.MapAttempt, + new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.START_TIME, + Keys.TRACKER_NAME, Keys.HTTP_PORT}, + new String[]{taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + String.valueOf(startTime), trackerName, + httpPort == -1 ? "" : + String.valueOf(httpPort)}, id); } } @@ -1762,24 +1727,22 @@ public class JobHistory { String taskType, String stateString, Counters counter) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.MapAttempt, - new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, - Keys.FINISH_TIME, Keys.HOSTNAME, - Keys.STATE_STRING, Keys.COUNTERS}, - new String[]{taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - Values.SUCCESS.name(), - String.valueOf(finishTime), hostName, - stateString, - counter.makeEscapedCompactString()}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.MapAttempt, + new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, + Keys.FINISH_TIME, Keys.HOSTNAME, + Keys.STATE_STRING, Keys.COUNTERS}, + new String[]{taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + Values.SUCCESS.name(), + String.valueOf(finishTime), hostName, + stateString, + counter.makeEscapedCompactString()}, id); } } @@ -1811,22 +1774,20 @@ public class JobHistory { public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.MapAttempt, - new Keys[]{Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, - Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR}, - new String[]{ taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - Values.FAILED.name(), - String.valueOf(timestamp), - hostName, error}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.MapAttempt, + new Keys[]{Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, + Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR}, + new String[]{ taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + Values.FAILED.name(), + String.valueOf(timestamp), + hostName, error}, id); } } @@ -1857,23 +1818,21 @@ public class JobHistory { public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.MapAttempt, - new Keys[]{Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, - Keys.FINISH_TIME, Keys.HOSTNAME, - Keys.ERROR}, - new String[]{ taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - Values.KILLED.name(), - String.valueOf(timestamp), - hostName, error}); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.MapAttempt, + new Keys[]{Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, + Keys.FINISH_TIME, Keys.HOSTNAME, + Keys.ERROR}, + new String[]{ taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + Values.KILLED.name(), + String.valueOf(timestamp), + hostName, error}, id); } } } @@ -1909,62 +1868,59 @@ public class JobHistory { long startTime, String trackerName, int httpPort, String taskType) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.ReduceAttempt, - new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.START_TIME, - Keys.TRACKER_NAME, Keys.HTTP_PORT}, - new String[]{taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - String.valueOf(startTime), trackerName, - httpPort == -1 ? "" : - String.valueOf(httpPort)}); - } - } - } - - /** - * Log finished event of this task. - * @param taskAttemptId task attempt id - * @param shuffleFinished shuffle finish time - * @param sortFinished sort finish time - * @param finishTime finish time of task - * @param hostName host name where task attempt executed - * @deprecated Use - * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)} - */ - @Deprecated - public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, - long sortFinished, long finishTime, - String hostName){ - logFinished(taskAttemptId, shuffleFinished, sortFinished, - finishTime, hostName, Values.REDUCE.name(), - "", new Counters()); - } - - /** - * Log finished event of this task. - * - * @param taskAttemptId task attempt id - * @param shuffleFinished shuffle finish time - * @param sortFinished sort finish time - * @param finishTime finish time of task - * @param hostName host name where task attempt executed - * @param taskType Whether the attempt is cleanup or setup or reduce - * @param stateString the state string of the attempt - * @param counter counters of the attempt - */ - public static void logFinished(TaskAttemptID taskAttemptId, - long shuffleFinished, - long sortFinished, long finishTime, - String hostName, String taskType, - String stateString, Counters counter) { - if (!disableHistory){ + if (null != writer){ + JobHistory.log(writer, RecordTypes.ReduceAttempt, + new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.START_TIME, + Keys.TRACKER_NAME, Keys.HTTP_PORT}, + new String[]{taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + String.valueOf(startTime), trackerName, + httpPort == -1 ? "" : + String.valueOf(httpPort)}, id); + } + } + + /** + * Log finished event of this task. + * @param taskAttemptId task attempt id + * @param shuffleFinished shuffle finish time + * @param sortFinished sort finish time + * @param finishTime finish time of task + * @param hostName host name where task attempt executed + * @deprecated Use + * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)} + */ + @Deprecated + public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, + long sortFinished, long finishTime, + String hostName){ + logFinished(taskAttemptId, shuffleFinished, sortFinished, + finishTime, hostName, Values.REDUCE.name(), + "", new Counters()); + } + + /** + * Log finished event of this task. + * + * @param taskAttemptId task attempt id + * @param shuffleFinished shuffle finish time + * @param sortFinished sort finish time + * @param finishTime finish time of task + * @param hostName host name where task attempt executed + * @param taskType Whether the attempt is cleanup or setup or reduce + * @param stateString the state string of the attempt + * @param counter counters of the attempt + */ + public static void logFinished(TaskAttemptID taskAttemptId, + long shuffleFinished, + long sortFinished, long finishTime, + String hostName, String taskType, + String stateString, Counters counter) { JobID id = taskAttemptId.getJobID(); ArrayList writer = fileManager.getWriters(id); @@ -1983,9 +1939,8 @@ public class JobHistory { String.valueOf(sortFinished), String.valueOf(finishTime), hostName, stateString, - counter.makeEscapedCompactString()}); + counter.makeEscapedCompactString()}, id); } - } } /** @@ -2015,22 +1970,20 @@ public class JobHistory { public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.ReduceAttempt, - new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, - Keys.FINISH_TIME, Keys.HOSTNAME, - Keys.ERROR }, - new String[]{ taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - Values.FAILED.name(), - String.valueOf(timestamp), hostName, error }); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.ReduceAttempt, + new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, + Keys.FINISH_TIME, Keys.HOSTNAME, + Keys.ERROR }, + new String[]{ taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + Values.FAILED.name(), + String.valueOf(timestamp), hostName, error }, id); } } @@ -2061,23 +2014,21 @@ public class JobHistory { public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { - if (!disableHistory){ - JobID id = taskAttemptId.getJobID(); - ArrayList writer = fileManager.getWriters(id); + JobID id = taskAttemptId.getJobID(); + ArrayList writer = fileManager.getWriters(id); - if (null != writer){ - JobHistory.log(writer, RecordTypes.ReduceAttempt, - new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, - Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, - Keys.FINISH_TIME, Keys.HOSTNAME, - Keys.ERROR }, - new String[]{ taskType, - taskAttemptId.getTaskID().toString(), - taskAttemptId.toString(), - Values.KILLED.name(), - String.valueOf(timestamp), - hostName, error }); - } + if (null != writer){ + JobHistory.log(writer, RecordTypes.ReduceAttempt, + new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, + Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, + Keys.FINISH_TIME, Keys.HOSTNAME, + Keys.ERROR }, + new String[]{ taskType, + taskAttemptId.getTaskID().toString(), + taskAttemptId.toString(), + Values.KILLED.name(), + String.valueOf(timestamp), + hostName, error }, id); } } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077531&r1=1077530&r2=1077531&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:24:57 2011 @@ -2126,14 +2126,14 @@ public class JobTracker implements MRCon infoServer.setAttribute("job.tracker", this); // initialize history parameters. final JobTracker jtFinal = this; - boolean historyInitialized = - getMROwner().doAs(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws Exception { - return JobHistory.init(jtFinal, conf,jtFinal.localMachine, - jtFinal.startTime); - } - }); + getMROwner().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + JobHistory.init(jtFinal, conf,jtFinal.localMachine, + jtFinal.startTime); + return true; + } + }); infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); infoServer.start(); @@ -2202,7 +2202,6 @@ public class JobTracker implements MRCon // Check if the history is enabled .. as we cant have persistence with // history disabled if (conf.getBoolean("mapred.jobtracker.restart.recover", false) - && !JobHistory.isDisableHistory() && systemDirData != null) { for (FileStatus status : systemDirData) { try { @@ -2249,20 +2248,18 @@ public class JobTracker implements MRCon } // Initialize history DONE folder - if (historyInitialized) { - FileSystem historyFS = getMROwner().doAs( - new PrivilegedExceptionAction() { - public FileSystem run() throws IOException { - JobHistory.initDone(conf, fs); - final String historyLogDir = - JobHistory.getCompletedJobHistoryLocation().toString(); - infoServer.setAttribute("historyLogDir", historyLogDir); - - return new Path(historyLogDir).getFileSystem(conf); - } - }); - infoServer.setAttribute("fileSys", historyFS); - } + FileSystem historyFS = getMROwner().doAs( + new PrivilegedExceptionAction() { + public FileSystem run() throws IOException { + JobHistory.initDone(conf, fs); + final String historyLogDir = + JobHistory.getCompletedJobHistoryLocation().toString(); + infoServer.setAttribute("historyLogDir", historyLogDir); + + return new Path(historyLogDir).getFileSystem(conf); + } + }); + infoServer.setAttribute("fileSys", historyFS); this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java?rev=1077531&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java Fri Mar 4 04:24:57 2011 @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.taglibs.standard.extra.spath.Predicate; + +import org.mortbay.log.Log; + +import junit.framework.TestCase; +import org.apache.hadoop.fs.Path; + +/** + * Test {@link JobTracker} w.r.t config parameters. + */ +public class TestJobHistoryConfig extends TestCase { + // private MiniMRCluster mr = null; + private MiniDFSCluster mdfs = null; + private String namenode = null; + FileSystem fileSys = null; + final Path inDir = new Path("./input"); + final Path outDir = new Path("./output"); + + private void setUpCluster(JobConf conf) throws IOException, + InterruptedException { + mdfs = new MiniDFSCluster(conf, 1, true, null); + fileSys = mdfs.getFileSystem(); + namenode = fileSys.getUri().toString(); + } + + /** + * Test case to make sure that JobTracker will start and JobHistory enabled + *
    + *
  1. Run a job with valid jobhistory configuration
  2. + *
  3. Check if JobTracker can start
  4. + *
+ * + * @throws Exception + */ + + public void testJobHistoryWithValidConfiguration() throws Exception { + try { + JobConf conf = new JobConf(); + setUpCluster(conf); + conf.set("hadoop.job.history.location", "/hadoop/history"); + conf = MiniMRCluster.configureJobConf(conf, namenode, 0, 0, null); + boolean started = canStartJobTracker(conf); + assertTrue(started); + } finally { + if (mdfs != null) { + try { + mdfs.shutdown(); + } catch (Exception e) { + } + } + } + } + + public static class MapperClass extends MapReduceBase implements + Mapper { + public void configure(JobConf job) { + } + + public void map(LongWritable key, Text value, + OutputCollector output, Reporter reporter) + throws IOException { + throw new IOException(); + } + } + + public void testJobHistoryLogging() throws Exception { + JobConf conf = new JobConf(); + setUpCluster(conf); + conf.setMapperClass(MapperClass.class); + conf.setReducerClass(IdentityReducer.class); + conf.setNumReduceTasks(0); + JobClient jc = new JobClient(conf); + conf.set("hadoop.job.history.location", "/hadoop/history"); + conf = MiniMRCluster.configureJobConf(conf, namenode, 0, 0, null); + FileSystem inFs = inDir.getFileSystem(conf); + if (!inFs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + conf.setSpeculativeExecution(false); + conf.setJobName("test"); + conf.setUser("testuser"); + conf.setQueueName("testQueue"); + String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", + "/tmp")).toString().replace(' ', '+'); + JobTracker jt = JobTracker.startTracker(conf); + assertTrue(jt != null); + JobInProgress jip = new JobInProgress(new JobID("jt", 1), + new JobConf(conf), jt); + assertTrue(jip != null); + jip.jobFile = "testfile"; + String historyFile = JobHistory.getHistoryFilePath(jip.getJobID()); + JobHistory.JobInfo.logSubmitted(jip.getJobID(), jip.getJobConf(), + jip.jobFile, jip.startTime); + } + + /** + * Check whether the JobTracker can be started. + * + * @throws IOException + */ + private boolean canStartJobTracker(JobConf conf) throws InterruptedException, + IOException { + JobTracker jt = null; + try { + jt = JobTracker.startTracker(conf); + Log.info("Started JobTracker"); + } catch (IOException e) { + Log.info("Can not Start JobTracker", e.getLocalizedMessage()); + return false; + } + if (jt != null) { + jt.fs.close(); + jt.stopTracker(); + } + return true; + } +}