Author: omalley Date: Tue Mar 8 05:53:05 2011 New Revision: 1079184 URL: http://svn.apache.org/viewvc?rev=1079184&view=rev Log: commit ef1a2a3dcc014ffe026c7be3c2e06a44da14de3a Author: Richard King Date: Mon Nov 15 22:43:39 2010 +0000 increase the flexibility of searching the job history in jobhistory.jsp . Also, stores job history files in multiple directories, and establishes a rudimentry database index, to make searches more performant. +++ b/YAHOO-CHANGES.txt + increase the flexibility of searching the job history in + jobhistory.jsp . From + By dking + + + stores job history files in multiple directories, and establishes + a rudimentry database index, to make searches more performant. + From + By dking + Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Mar 8 05:53:05 2011 @@ -247,7 +247,7 @@ public class JobTracker implements MRCon * Return the JT's job history handle. * @return the jobhistory handle */ - JobHistory getJobHistory() { return jobHistory; } + public JobHistory getJobHistory() { return jobHistory; } /** * Start the JobTracker with given configuration. * @@ -1617,6 +1617,9 @@ public class JobTracker implements MRCon } }); infoServer.setAttribute("fileSys", historyFS); + infoServer.setAttribute("jobHistoryHistory", jobHistory); + infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString()); + infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString()); this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, @@ -4622,6 +4625,9 @@ public class JobTracker implements MRCon } infoServer.setAttribute("fileSys", historyFS); + infoServer.setAttribute("jobHistoryHistory", jobHistory); + infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString()); + infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString()); infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); infoServer.start(); this.infoPort = this.infoServer.getPort(); Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java Tue Mar 8 05:53:05 2011 @@ -259,8 +259,7 @@ public class Cluster { if (jobHistoryDir == null) { jobHistoryDir = new Path(client.getJobHistoryDir()); } - return JobHistory.getJobHistoryFile(jobHistoryDir, jobId, - ugi.getShortUserName()).toString(); + return JobHistory.getJobHistoryFile(jobHistoryDir, jobId).toString(); } /** Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Mar 8 05:53:05 2011 @@ -18,20 +18,38 @@ package org.apache.hadoop.mapreduce.jobhistory; +import java.io.BufferedReader; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; + import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -43,7 +61,9 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobTracker; @@ -64,7 +84,7 @@ public class JobHistory { final Log LOG = LogFactory.getLog(JobHistory.class); private long jobHistoryBlockSize; - private final Map fileMap = + private static final Map fileMap = Collections.synchronizedMap(new HashMap()); private ThreadPoolExecutor executor = null; static final FsPermission HISTORY_DIR_PERMISSION = @@ -82,23 +102,69 @@ public class JobHistory { private Path logDir = null; private Path done = null; // folder for completed jobs + private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail(); + private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*"; + + static final String CONF_FILE_NAME_SUFFIX = "_conf.xml"; + + // XXXXX debug mode -- set this to false for production + private static final boolean DEBUG_MODE = false; + + private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6; + private static final int SERIAL_NUMBER_LOW_DIGITS = DEBUG_MODE ? 1 : 3; + + private static final String SERIAL_NUMBER_FORMAT + = ("%0" + + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS) + + "d"); + + private static final Set existingDoneSubdirs = new HashSet(); + + private static final SortedMap idToDateString + = new TreeMap(); + + private static Pattern historyCleanerParseDirectory + = Pattern.compile(".+/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)/?"); + // .+ / YYYY / MM / DD / HH /? + + public static final String OLD_SUFFIX = ".old"; + public static final String OLD_FULL_SUFFIX_REGEX_STRING + = "(?:\\.[0-9]+" + Pattern.quote(OLD_SUFFIX) + ")"; // Version string that will prefix all History Files public static final String HISTORY_VERSION = "1.0"; private HistoryCleaner historyCleanerThread = null; - private Map jobHistoryFileMap = + private static final int version = 3; + private static final String LOG_VERSION_STRING = "version-" + version; + + private long jobTrackerStartTime; + private String jobTrackerHostName; + private String jobTrackerUniqueName; + + private static final Map jobHistoryFileMap = Collections.synchronizedMap( new LinkedHashMap()); + // The invariant is that UnindexedElementsState tracks the identity + // of the currently-filling done directory subdirectory, and what + // needs to indexed. + // Has to be locked for each file disposition decision + private final UnindexedElementsState ueState = new UnindexedElementsState(); + // JobHistory filename regex public static final Pattern JOBHISTORY_FILENAME_REGEX = - Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+"); + Pattern.compile("(" + JobID.JOBID_REGEX + ")" + + OLD_FULL_SUFFIX_REGEX_STRING + "?"); // JobHistory conf-filename regex public static final Pattern CONF_FILENAME_REGEX = - Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); + Pattern.compile("(" + JobID.JOBID_REGEX + ")" + + CONF_FILE_NAME_SUFFIX + + OLD_FULL_SUFFIX_REGEX_STRING + "?"); + + private static final int MAXIMUM_DATESTRING_COUNT = 200000; private static class MovedFileInfo { private final String historyFile; @@ -119,6 +185,11 @@ public class JobHistory { public void init(JobTracker jt, JobConf conf, String hostname, long jobTrackerStartTime) throws IOException { + jobTrackerHostName = hostname; + this.jobTrackerStartTime = jobTrackerStartTime; + + this.jobTrackerUniqueName = jobTrackerHostName + "-" + jobTrackerStartTime; + // Get and create the log folder final String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION , "file:///" + @@ -145,6 +216,20 @@ public class JobHistory { jobTracker = jt; } + + public static String leafGlobString() { + return "/" + LOG_VERSION_STRING + + "/*" // job tracker ID + + "/YYYY/MM/DD/HH" // time segment + ; + } + + public static String globString() { + return "/" + LOG_VERSION_STRING + + "/*" // job tracker ID + + "/YYYY/MM/DD" // time segment + ; + } /** Initialize the done directory and start the history cleaner thread */ public void initDone(JobConf conf, FileSystem fs) throws IOException { @@ -163,8 +248,8 @@ public class JobHistory { //permission if (!doneDirFs.exists(done)) { LOG.info("Creating DONE folder at "+ done); - if (! doneDirFs.mkdirs(done, - new FsPermission(HISTORY_DIR_PERMISSION))) { + if (!doneDirFs.mkdirs(done, + new FsPermission(HISTORY_DIR_PERMISSION))) { throw new IOException("Mkdirs failed to create " + done.toString()); } } @@ -218,11 +303,20 @@ public class JobHistory { /** * Get the job history file path */ - public static Path getJobHistoryFile(Path dir, JobID jobId, - String user) { - return new Path(dir, jobId.toString() + "_" + user); + public static Path getJobHistoryFile + (Path dir, JobID jobId) { + MetaInfo info = fileMap.get(jobId); + + if (info == null) { + fileMap.put(jobId, new MetaInfo(null, null, null, System.currentTimeMillis(), null, null)); + return getJobHistoryFile(dir, jobId); + } + + return new Path(dir, jobId.toString()); } + + /** * Get the JobID from the history file's name. See it's companion method * {@link #getJobHistoryFile(Path, JobID, String)} for how history file's name @@ -238,19 +332,37 @@ public class JobHistory { return JobID.forName(jobId); } - /** - * Get the user name of the job-submitter from the history file's name. See - * it's companion method {@link #getJobHistoryFile(Path, JobID, String)} for - * how history file's name is constructed from a given JobID and username. - * - * @param jobHistoryFilePath - * @return the user-name - */ - public static String getUserFromHistoryFilePath(Path jobHistoryFilePath) { - String[] jobDetails = jobHistoryFilePath.getName().split("_"); - return jobDetails[3]; + static String nonOccursString(String logFileName) { + int adHocIndex = 0; + + String unfoundString = "q" + adHocIndex; + + while (logFileName.contains(unfoundString)) { + unfoundString = "q" + ++adHocIndex; + } + + return unfoundString + "q"; } + // I tolerate this code because I expect a low number of + // occurrences in a relatively short string + static String replaceStringInstances + (String logFileName, String old, String replacement) { + int index = logFileName.indexOf(old); + + while (index > 0) { + logFileName = (logFileName.substring(0, index) + + replacement + + replaceStringInstances + (logFileName.substring(index + old.length()), + old, replacement)); + + index = logFileName.indexOf(old); + } + + return logFileName; + } + /** * Given the job id, return the history file path from the cache */ @@ -261,6 +373,31 @@ public class JobHistory { } return info.historyFile; } + + /** + * Given the job id, return the conf.xml file path from the cache + */ + public String getConfFilePath(JobID jobId) { + MovedFileInfo info = jobHistoryFileMap.get(jobId); + if (info == null) { + return null; + } + final Path historyFileDir + = (new Path(getHistoryFilePath(jobId))).getParent(); + return getConfFile(historyFileDir, jobId).toString(); + } + + /** + * Get the job name from the job conf + */ + static String getJobName(JobConf jobConf) { + String jobName = jobConf.getJobName(); + if (jobName == null || jobName.length() == 0) { + jobName = "NA"; + } + return jobName; + } + /** * Create an event writer for the Job represented by the jobID. * This should be the first call to history for a job @@ -270,12 +407,19 @@ public class JobHistory { */ public void setupEventWriter(JobID jobId, JobConf jobConf) throws IOException { - Path logFile = getJobHistoryFile(logDir, jobId, getUserName(jobConf)); - if (logDir == null) { LOG.info("Log Directory is null, returning"); throw new IOException("Missing Log Directory for History"); } + + MetaInfo oldFi = fileMap.get(jobId); + + long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime); + + String user = getUserName(jobConf); + String jobName = getJobName(jobConf); + + Path logFile = getJobHistoryFile(logDir, jobId); int defaultBufferSize = logDirFs.getConf().getInt("io.file.buffer.size", 4096); @@ -315,7 +459,7 @@ public class JobHistory { + StringUtils.stringifyException(e)); } - MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer); + MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName); fileMap.put(jobId, fi); } @@ -360,7 +504,7 @@ public class JobHistory { } private void startFileMoverThreads() { - executor = new ThreadPoolExecutor(1, 3, 1, + executor = new ThreadPoolExecutor(3, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue()); } @@ -375,14 +519,14 @@ public class JobHistory { Path jobFilePath = null; if (logDir != null) { jobFilePath = new Path(logDir + File.separator + - jobId.toString() + "_conf.xml"); + jobId.toString() + CONF_FILE_NAME_SUFFIX); } return jobFilePath; } /** * Generates a suffix for old/stale jobhistory files - * Pattern : . + identifier + .old + * Pattern : . + identifier + JobHistory.OLD_SUFFIX */ public static String getOldFileSuffix(String identifier) { return "." + identifier + JobHistory.OLD_SUFFIX; @@ -394,13 +538,18 @@ public class JobHistory { //files with same job id don't get over written in case of recovery. FileStatus[] files = logDirFs.listStatus(logDir); String fileSuffix = getOldFileSuffix(jobTracker.getTrackerIdentifier()); + // We use the same millisecond time for all files so the config file + // and job history file flow to the same subdirectory + long millisecondTime = ueState.monotonicTime(); for (FileStatus fileStatus : files) { Path fromPath = fileStatus.getPath(); if (fromPath.equals(done)) { //DONE can be a subfolder of log dir continue; } LOG.info("Moving log file from last run: " + fromPath); - Path toPath = new Path(done, fromPath.getName() + fileSuffix); + Path resultDir + = canonicalHistoryLogDir(null, millisecondTime); + Path toPath = new Path(resultDir, fromPath.getName() + fileSuffix); try { moveToDoneNow(fromPath, toPath); } catch (ChecksumException e) { @@ -424,15 +573,16 @@ public class JobHistory { } } } + private void moveToDone(final JobID id) { final List paths = new ArrayList(); final MetaInfo metaInfo = fileMap.get(id); - if (metaInfo == null) { + if (metaInfo == null || metaInfo.getHistoryFile() == null) { LOG.info("No file for job-history with " + id + " found in cache!"); return; } - + final Path historyFile = metaInfo.getHistoryFile(); if (historyFile == null) { LOG.info("No file for job-history with " + id + " found in cache!"); @@ -448,32 +598,442 @@ public class JobHistory { } executor.execute(new Runnable() { + static final int SPONTANEOUSLY_CLOSE_INDEX_INTERVAL = 30 * 1000; + + static final int SPONTANEOUS_INTERIM_INDEX_INTERVAL = 300 * 1000; public void run() { - //move the files to DONE folder - try { - for (Path path : paths) { - moveToDoneNow(path, new Path(done, path.getName())); + boolean iShouldMonitor = false; + + Path resultDir = null; + + String historyFileDonePath = null; + + Path failedJobHistoryIndexBuildPath = null; + Throwable failedHistoryMoveException = null; + + synchronized (ueState) { + // needed because it's possible for system time to go backward + long millisecondTime = ueState.monotonicTime(); + + resultDir = canonicalHistoryLogDir(id, millisecondTime); + + if (!resultDir.equals(ueState.currentDoneSubdirectory)) { + if (ueState.currentDoneSubdirectory != null) { + try { + ueState.closeCurrentDirectory(); + } catch (IOException e) { + failedJobHistoryIndexBuildPath = ueState.currentDoneSubdirectory; + } + } + + iShouldMonitor = true; + + ueState.indexableElements = new LinkedList(); + ueState.currentDoneSubdirectory = resultDir; + + ueState.monitoredDirectory = resultDir; } - } catch (Throwable e) { - LOG.error("Unable to move history file to DONE folder.", e); + + // We need to make the JobHistoryIndexElement here, because after + // we've copied the file the info might disappear, but before we've + // closed the previous subdirectory [if we do that] it would go into + // the wrong subdirectory index. + ueState.indexableElements. + add(new JobHistoryIndexElement(millisecondTime, id, metaInfo)); + + //move the files to a DONE canonical subfolder + try { + for (Path path : paths) { + moveToDoneNow(path, new Path(resultDir, path.getName())); + } + } catch (Throwable e) { + failedHistoryMoveException = e; + } + if (historyFile != null) { + historyFileDonePath = new Path(resultDir, + historyFile.getName()).toString(); + } + jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, + millisecondTime)); } - String historyFileDonePath = null; - if (historyFile != null) { - historyFileDonePath = new Path(done, - historyFile.getName()).toString(); + + if (failedJobHistoryIndexBuildPath != null) { + LOG.warn("Couldn't build a Job History index for " + + failedJobHistoryIndexBuildPath); + } + if (failedHistoryMoveException != null) { + LOG.error("Can't move history file to DONE canonical subfolder.", + failedHistoryMoveException); } - jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, - System.currentTimeMillis())); + + jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id), - historyFileDonePath); + historyFileDonePath); //purge the job from the cache fileMap.remove(id); - } + // Except ephemerally, only one task will be in this code at a + // time, because iShouldMonitor is only set true when + // ueState.monitoredDirectory changes, which will force the + // current incumbent to abend at the earliest opportunity. + while (iShouldMonitor) { + int roundCounter = 0; + + int interruptionsToAbort = 2; + + try { + Thread.sleep(SPONTANEOUSLY_CLOSE_INDEX_INTERVAL); + } catch (InterruptedException e) { + if (--interruptionsToAbort == 0) { + return; + } + } + + Path unbuildableJobHistoryIndex = null; + Path unbuildableInterimJobHistoryIndex = null; + + synchronized (ueState) { + if (ueState.monitoredDirectory != resultDir) { + // someone else closed out the directory I was monitoring + iShouldMonitor = false; + } else { + interruptionsToAbort = 2; + + long millisecondTime = ueState.monotonicTime(); + + Path newResultDir = canonicalHistoryLogDir(id, millisecondTime); + + if (!newResultDir.equals(resultDir)) { + try { + ueState.closeCurrentDirectory(); + } catch (IOException e) { + unbuildableJobHistoryIndex = ueState.currentDoneSubdirectory; + } + iShouldMonitor = false; + } + } + + if (iShouldMonitor + && (++roundCounter + % (SPONTANEOUS_INTERIM_INDEX_INTERVAL + / SPONTANEOUSLY_CLOSE_INDEX_INTERVAL) + == 0)) { + // called for side effect -- a 5 minute checkpoint to + // reduce possible unindexed jobs on a JT crash + try { + ueState.getACurrentIndex(ueState.currentDoneSubdirectory); + } catch (IOException e) { + unbuildableInterimJobHistoryIndex + = ueState.currentDoneSubdirectory; + } + } + } + + if (unbuildableJobHistoryIndex != null) { + LOG.warn("Couldn't build a Job History index for " + + unbuildableJobHistoryIndex); + } + + if (unbuildableInterimJobHistoryIndex != null) { + LOG.warn("Couldn't build an interim Job History index for " + + unbuildableInterimJobHistoryIndex); + } + } + } }); } + + public String[] currentIndex(Path theDoneSubdirectory) + throws IOException { + return ueState.currentIndex(theDoneSubdirectory); + } + + // we only create one instance per JobHistory + class UnindexedElementsState { + long monotonicTime = Long.MIN_VALUE; + Path currentDoneSubdirectory = null; + private List indexableElements = null; + Path monitoredDirectory = null; + int indexIndex = 0; + int indexedElementCount = 0; + + private void buildIndex(String indexName) throws IOException { + Path tempPath = new Path(currentDoneSubdirectory, "nascent-index"); + Path indexPath = new Path(currentDoneSubdirectory, indexName); + + OutputStream newIndexOStream = null; + PrintStream newIndexPStream = null; + + indexedElementCount = indexableElements.size(); + + try { + newIndexOStream + = FileSystem.create(doneDirFs, tempPath, HISTORY_FILE_PERMISSION); + + newIndexPStream = new PrintStream(newIndexOStream); + + for (JobHistoryIndexElement elt : indexableElements) { + newIndexPStream.println(elt.toString()); + } + } finally { + if (newIndexPStream != null) { + newIndexPStream.close(); + + if (doneDirFs.exists(tempPath)) { + doneDirFs.rename(tempPath, indexPath); + } + } else if (newIndexOStream != null) { + newIndexOStream.close(); + doneDirFs.delete(tempPath, false); + } + } + } + + synchronized String[] currentIndex(Path theDoneSubdirectory) + throws IOException { + Path subdirIndex = getACurrentIndex(theDoneSubdirectory); + + List indexAsRead = new ArrayList(); + + InputStream iStream = null; + InputStreamReader isReader = null; + BufferedReader breader = null; + + try { + iStream = doneDirFs.open(subdirIndex); + isReader = new InputStreamReader(iStream); + breader = new BufferedReader(isReader); + + String thisRecord = breader.readLine(); + + while (thisRecord != null) { + indexAsRead.add(thisRecord); + thisRecord = breader.readLine(); + } + + String[] result = indexAsRead.toArray(new String[0]); + + Arrays.sort(result); + + return result; + } finally { + if (breader != null) { + breader.close(); + } else if (isReader != null) { + isReader.close(); + } else if (iStream != null) { + iStream.close(); + } + } + } + + // If this is the block that's now being built, we build a new + // index and return that. This shouldn't be called on an empty + // subdirectory. + // + // getACurrentIndex must be called within a synchronized(this) block. + // Currently there are two calls, both of which qualify. + Path getACurrentIndex(Path theDoneSubdirectory) throws IOException { + if (!theDoneSubdirectory.equals(currentDoneSubdirectory)) { + return new Path(theDoneSubdirectory, "index"); + } + + if (indexedElementCount == indexableElements.size()) { + return new Path(theDoneSubdirectory, "index-" + indexIndex); + } + + String indexName = "index-" + ++indexIndex; + + buildIndex(indexName); + + return new Path(theDoneSubdirectory, indexName); + } + + // not synchronized, because calls must be in a larger synchronized context + private void closeCurrentDirectory() throws IOException { + if (currentDoneSubdirectory == null) { + return; + } + + buildIndex("index"); + } + + synchronized long monotonicTime() { + monotonicTime = Math.max(monotonicTime, System.currentTimeMillis()); + return monotonicTime; + } + } + + static class JobHistoryIndexElement { + // id and millisecondTime are currently unused. + final JobID id; + final long millisecondTime; + final MetaInfo metaInfo; + + JobHistoryIndexElement(long millisecondTime, JobID id, MetaInfo metaInfo) { + this.id = id; + this.millisecondTime = millisecondTime; + this.metaInfo = metaInfo; + } + + public String toString() { + String user = metaInfo.user; + String jobName = metaInfo.jobName; + + if (jobName.length() > 50) { + jobName = jobName.substring(0, 50); + } + + String adHocBarEscape = ""; + + if (user.indexOf('|') >= 0 || jobName.indexOf('|') >= 0) { + adHocBarEscape = nonOccursString(user + jobName); + + user = replaceStringInstances(user, "|", adHocBarEscape); + jobName = replaceStringInstances(jobName, "|", adHocBarEscape); + } + + return (metaInfo.getHistoryFile().getName() + + "|" + millisecondTime + + "|" + adHocBarEscape + + "|" + user + + "|" + jobName); + } + } + + // several methods for manipulating the subdirectories of the DONE + // directory + + // directory components may contain internal slashes, but do NOT + // contain slashes at either end. + + // In this nest of code, id can be null. In that case it is an error to call + // more than once to get a single filename. This can happen when we're moving + // files from an old run into the new context. See moveOldFiles() . + private static String timestampDirectoryComponent(JobID id, long millisecondTime) { + Integer boxedSerialNumber = null; + + if (id != null) { + boxedSerialNumber = id.getId(); + } + + // don't want to do this inside the lock + Calendar timestamp = Calendar.getInstance(); + timestamp.setTimeInMillis(millisecondTime); + + synchronized (idToDateString) { + String dateString + = (boxedSerialNumber == null ? null : idToDateString.get(boxedSerialNumber)); + + if (dateString == null) { + + dateString = String.format + ("%04d/%02d/%02d/%02d", + timestamp.get(Calendar.YEAR), + timestamp.get(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH) + 1, + timestamp.get(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH), + timestamp.get(DEBUG_MODE ? Calendar.SECOND : Calendar.HOUR)); + + dateString = dateString.intern(); + + if (boxedSerialNumber != null) { + idToDateString.put(boxedSerialNumber, dateString); + + if (idToDateString.size() > MAXIMUM_DATESTRING_COUNT) { + idToDateString.remove(idToDateString.firstKey()); + } + } + } + + return dateString; + } + } + + // returns false iff the directory already existed + private boolean maybeMakeSubdirectory(JobID id, long millisecondTime) + throws IOException { + Path dir = canonicalHistoryLogDir(id, millisecondTime); + + String deferredErrorPrintout = null; + String deferredInfoLogging = null; + + try { + synchronized (existingDoneSubdirs) { + if (existingDoneSubdirs.contains(dir)) { + if (DEBUG_MODE && !doneDirFs.exists(dir)) { + deferredErrorPrintout + = ("JobHistory.maybeMakeSubdirectory -- We believed " + + dir + " already existed, but it didn't."); + } + + return true; + } + + if (!doneDirFs.exists(dir)) { + deferredInfoLogging = "Creating DONE subfolder at " + dir; + + if (!doneDirFs.mkdirs(dir, + new FsPermission(HISTORY_DIR_PERMISSION))) { + throw new IOException("Mkdirs failed to create " + dir.toString()); + } + + existingDoneSubdirs.add(dir); + + return false; + } else { + if (DEBUG_MODE) { + deferredErrorPrintout + = ("JobHistory.maybeMakeSubdirectory -- We believed " + + dir + " didn't already exist, but it did."); + } + + return false; + } + } + } finally { + if (deferredErrorPrintout != null) { + System.err.println(deferredErrorPrintout); + } + + if (deferredInfoLogging != null) { + LOG.info(deferredInfoLogging); + } + } + } + + // Previous versions of this code used the id argument, when the + // directory structure was a bit different. + // I'm leaving the currently unused id argument in place, in case we + // decide to start using it again in the future. + private Path canonicalHistoryLogDir(JobID id, long millisecondTime) { + return new Path(done, historyLogSubdirectory(id, millisecondTime)); + } + + private String historyLogSubdirectory(JobID id, long millisecondTime) { + String result = LOG_VERSION_STRING + + "/" + jobtrackerDirectoryComponent(id); + + result = (result + + "/" + timestampDirectoryComponent(id, millisecondTime) + + "/") + .intern(); + + return result; + } + + private String jobtrackerDirectoryComponent(JobID id) { + return jobTrackerUniqueName; + } + + private static String doneSubdirsBeforeSerialTail() { + // job tracker ID + date + String result = "/*/*/*/*/*"; // job tracker instance ID/YYYY/MM/DD/HH + + return result; + } + private String getUserName(JobConf jobConf) { String user = jobConf.getUser(); @@ -483,15 +1043,141 @@ public class JobHistory { return user; } + + // hasMismatches is just used to return a second value if you want + // one. I would have used MutableBoxedBoolean if such had been provided. + static Path[] filteredStat2Paths + (FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches) { + int resultCount = 0; + + if (hasMismatches == null) { + hasMismatches = new AtomicBoolean(false); + } + + for (int i = 0; i < stats.length; ++i) { + if (stats[i].isDir() == dirs) { + stats[resultCount++] = stats[i]; + } else { + hasMismatches.set(true); + } + } + + Path[] paddedResult = FileUtil.stat2Paths(stats); + + Path[] result = new Path[resultCount]; + + System.arraycopy(paddedResult, 0, result, 0, resultCount); + + return result; + } + + public FileStatus[] getAllHistoryConfFiles() throws IOException { + return localGlobber + (doneDirFs, done, "/" + LOG_VERSION_STRING + "/*/*/*/*/*"); + } + + public static FileStatus[] localGlobber + (FileSystem fs, Path root, String tail) + throws IOException { + return localGlobber(fs, root, tail, null); + } + + public static FileStatus[] localGlobber + (FileSystem fs, Path root, String tail, PathFilter filter) + throws IOException { + return localGlobber(fs, root, tail, filter, null); + } + + private static FileStatus[] nullToEmpty(FileStatus[] result) { + return result == null ? new FileStatus[0] : result; + } + + private static FileStatus[] listFilteredStatus + (FileSystem fs, Path root, PathFilter filter) + throws IOException { + return filter == null ? fs.listStatus(root) : fs.listStatus(root, filter); + } + + // hasMismatches is just used to return a second value if you want + // one. I would have used MutableBoxedBoolean if such had been provided. + static FileStatus[] localGlobber + (FileSystem fs, Path root, String tail, PathFilter filter, + AtomicBoolean hasFlatFiles) + throws IOException { + if (tail.equals("")) { + return nullToEmpty(listFilteredStatus(fs, root, filter)); + } + + if (tail.startsWith("/*")) { + Path[] subdirs = filteredStat2Paths(nullToEmpty(fs.listStatus(root)), + true, hasFlatFiles); + + FileStatus[][] subsubdirs = new FileStatus[subdirs.length][]; + + int subsubdirCount = 0; + + if (subsubdirs.length == 0) { + return new FileStatus[0]; + } + + String newTail = tail.substring(2); + + for (int i = 0; i < subdirs.length; ++i) { + subsubdirs[i] = localGlobber(fs, subdirs[i], newTail, filter, null); + subsubdirCount += subsubdirs[i].length; + } + + FileStatus[] result = new FileStatus[subsubdirCount]; + + int segmentStart = 0; + + for (int i = 0; i < subsubdirs.length; ++i) { + System.arraycopy(subsubdirs[i], 0, result, segmentStart, subsubdirs[i].length); + segmentStart += subsubdirs[i].length; + } + + return result; + } + + if (tail.startsWith("/")) { + int split = tail.indexOf('/', 1); + + try { + if (split < 0) { + return nullToEmpty + (listFilteredStatus(fs, new Path(root, tail.substring(1)), filter)); + } else { + String thisSegment = tail.substring(1, split); + String newTail = tail.substring(split); + return localGlobber + (fs, new Path(root, thisSegment), newTail, filter, hasFlatFiles); + } + } catch (FileNotFoundException ignored) { + return new FileStatus[0]; + } + } + + IOException e = new IOException("localGlobber: bad tail"); + + throw e; + } + private static class MetaInfo { private Path historyFile; private Path confFile; private EventWriter writer; + long submitTime; + String user; + String jobName; - MetaInfo(Path historyFile, Path conf, EventWriter writer) { + MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime, + String user, String jobName) { this.historyFile = historyFile; this.confFile = conf; this.writer = writer; + this.submitTime = submitTime; + this.user = user; + this.jobName = jobName; } Path getHistoryFile() { return historyFile; } @@ -512,18 +1198,315 @@ public class JobHistory { } /** + * Returns a job history log Path generator + * We return all Path's that exist in the history at time of call, subject to + * four conjunctive criteria, one for each of parameter. Each parameter is + * null or the empty string if caller doesn't want that filtering. + * It's perfectly alright to call this with no restrictions. + * + * @param user the desired username + * [or null or the empty string, if no specific user] + * @param jobnameSubstring a substring of the job names + * @param dateStrings an array of date strings, format MM/DD/YYYY . This + * criterion accepts logs with ANY of the dates + * @param soughtJobid the String naming the jobID we want, if any. Note + * that this criterion names a unique job; you may not want + * to specify any other criteria if you specify this one. + * @throws IOException + */ + public JobHistoryRecordRetriever getMatchingJobs + (String user, String jobnameSubstring, + String[] dateStrings, String soughtJobid) + throws IOException { + return new JobHistoryRecordRetriever + (user, jobnameSubstring, dateStrings, soughtJobid); + } + + public static class JobHistoryJobRecord { + private Path basePath; + private String recordText; + private String[] recordSplits = null; + + JobHistoryJobRecord(Path basePath, String recordText) { + this.basePath = basePath; + this.recordText = recordText; + } + + private String[] getSplits(boolean noCache) { + if (recordSplits != null) { + return recordSplits; + } + + String[] result = recordText.split("\\|"); + + if (!noCache) { + recordSplits = result; + } + + return result; + } + + public Path getPath() { + return getPath(false); + } + + public Path getPath(boolean noCache) { + return new Path(basePath, getSplits(noCache)[0]); + } + + public String getJobIDString() { + return getJobIDString(false); + } + + public String getJobIDString(boolean noCache) { + return getSplits(noCache)[0]; + } + + public long getSubmitTime() { + return getSubmitTime(false); + } + + public long getSubmitTime(boolean noCache) { + return Long.parseLong(getSplits(noCache)[1]); + } + + public String getUserName() { + return getUserName(false); + } + + public String getUserName(boolean noCache) { + String[] splits = getSplits(noCache); + + String result = splits[3]; + + if (splits[2].length() != 0) { + result = replaceStringInstances(result, splits[2], "|"); + } + + return result; + } + + public String getJobName() { + return getJobName(false); + } + + public String getJobName(boolean noCache) { + String[] splits = getSplits(noCache); + + String result = splits[4]; + + if (splits[2].length() != 0) { + result = replaceStringInstances(result, splits[2], "|"); + } + + return result; + } + } + + public class JobHistoryRecordRetriever implements Iterator { + private final Pattern DATE_PATTERN + = Pattern.compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])"); + + + private class BaseElements { + Path basePath; + List records = new LinkedList(); + } + + // Internal contract -- elts contains no empty BaseElements's + private List elts = new LinkedList(); + + private Iterator eltsCursor; + private Iterator currentEltCursor; + + private BaseElements currentBE = null; + + private int numberMatches; + + @Override + public boolean hasNext() { + return currentEltCursor.hasNext() || eltsCursor.hasNext(); + } + + @Override + public JobHistoryJobRecord next() throws NoSuchElementException { + if (currentEltCursor.hasNext()) { + return new JobHistoryJobRecord(currentBE.basePath, currentEltCursor.next()); + } + + currentBE = eltsCursor.next(); + currentEltCursor = currentBE.records.iterator(); + return next(); + } + + @Override + public void remove() throws UnsupportedOperationException { + throw new UnsupportedOperationException("no remove() operation"); + } + + public int numberMatches() { + return numberMatches; + } + + public JobHistoryRecordRetriever + (String soughtUser, String soughtJobName, String[] dateStrings, String soughtJobid) + throws IOException { + numberMatches = 0; + + soughtUser = soughtUser == null ? "" : soughtUser; + soughtJobName = soughtJobName == null ? "" : soughtJobName; + soughtJobid = soughtJobid == null ? "" : soughtJobid; + + if (dateStrings == null || dateStrings.length == 0) { + dateStrings = new String[1]; + dateStrings[0] = ""; + } + + for (int i = 0; i < dateStrings.length; ++i) { + String soughtDate = dateStrings[i]; + String globString = globString(); + + + String yyyyGlobPart = "*"; + String mmGlobPart = "*"; + String ddGlobPart = "*"; + String hhGlobPart = "*"; + + if (soughtDate.length() != 0) { + Matcher m = DATE_PATTERN.matcher(soughtDate); + if (m.matches()) { + yyyyGlobPart = m.group(3); + mmGlobPart = m.group(1); + ddGlobPart = m.group(2); + + if (yyyyGlobPart.length() == 2) { + yyyyGlobPart = "20" + yyyyGlobPart; + } + + if (mmGlobPart.length() == 1) { + mmGlobPart = "0" + mmGlobPart; + } + + if (ddGlobPart.length() == 1) { + ddGlobPart = "0" + ddGlobPart; + } + } + } + + globString = globString.replace("YYYY", yyyyGlobPart); + globString = globString.replace("MM", mmGlobPart); + globString = globString.replace("DD", ddGlobPart); + globString = globString.replace("HH", hhGlobPart); + + if (doneDirFs == null) { + if (DEBUG_MODE) { + System.out.println("Null file system. May be namenode is in safemode!"); + } + return; + } + + Path[] jobDirectories + = FileUtil.stat2Paths + (JobHistory.localGlobber + (doneDirFs, done, globString)); + + for (int jd = 0; jd < jobDirectories.length; jd++) { + Path doneSubdirectory = jobDirectories[jd]; + + String[] subdirectoryIndex = new String[0]; + + BaseElements be = new BaseElements(); + + be.basePath = doneSubdirectory; + + try { + subdirectoryIndex = currentIndex(doneSubdirectory); + } catch (FileNotFoundException e) { + // no code -- should we log here? + } + + for (int j = 0; j < subdirectoryIndex.length; ++j) { + String[] segments = subdirectoryIndex[j].split("\\|"); + + // segments are [0] jobid [also file name] + // [1] submit time [milliseconds] + // [2] ad hoc '|' substitution + // [3] user name + // [4] trimmed job jame + // + + String user = segments[3]; + String jobName = segments[4]; + + if (segments[2].length() > 0) { + user = replaceStringInstances(user, segments[2], "|"); + jobName = replaceStringInstances(jobName, segments[2], "|"); + } + + if ((soughtJobid.equals("") || segments[0].equalsIgnoreCase(soughtJobid)) + && (soughtUser.equals("") || user.equalsIgnoreCase(soughtUser)) + && (soughtJobName.equals("") || jobName.contains(soughtJobName))) { + be.records.add(subdirectoryIndex[j]); + } + } + + if (be.records.size() != 0) { + elts.add(be); + + numberMatches += be.records.size(); + } + } + } + + eltsCursor = elts.iterator(); + + currentEltCursor = new LinkedList().iterator(); + } + } + + static long directoryTime(String year, String seg2, String seg3, String seg4) { + // set to current time. In debug mode, this is where the month + // and day get set. + Calendar result = Calendar.getInstance(); + // canonicalize by filling in unset fields + result.setTimeInMillis(System.currentTimeMillis()); + + result.set(Calendar.YEAR, Integer.parseInt(year)); + + int seg2int = Integer.parseInt(seg2); + if (!DEBUG_MODE) { + --seg2int; + } + + result.set(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH, + seg2int); + result.set(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH, + Integer.parseInt(seg3)); + result.set(DEBUG_MODE ? Calendar.SECOND : Calendar.HOUR, + Integer.parseInt(seg4)); + + return result.getTimeInMillis(); + } + + /** * Delete history files older than a specified time duration. */ class HistoryCleaner extends Thread { static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L; + + static final long DIRECTORY_LIFE_IN_MS + = DEBUG_MODE ? 20 * 60 * 1000L : 30 * ONE_DAY_IN_MS; + static final long RUN_INTERVAL + = DEBUG_MODE ? 10L * 60L * 1000L : ONE_DAY_IN_MS; + private long cleanupFrequency; private long maxAgeOfHistoryFiles; - + public HistoryCleaner(long maxAge) { setName("Thread for cleaning up History files"); setDaemon(true); this.maxAgeOfHistoryFiles = maxAge; - cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles); + cleanupFrequency = Math.min(RUN_INTERVAL, maxAgeOfHistoryFiles); LOG.info("Job History Cleaner Thread started." + " MaxAge is " + maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," + @@ -551,29 +1534,93 @@ public class JobHistory { } private void doCleanup() { - long now = System.currentTimeMillis(); + long now = ueState.monotonicTime(); + + boolean printedOneDeletee = false; + + Set deletedPathnames = new HashSet(); + try { - FileStatus[] historyFiles = doneDirFs.listStatus(done); - if (historyFiles != null) { - for (FileStatus f : historyFiles) { - if (now - f.getModificationTime() > maxAgeOfHistoryFiles) { - doneDirFs.delete(f.getPath(), true); - LOG.info("Deleting old history file : " + f.getPath()); + Path[] datedDirectories + = FileUtil.stat2Paths(localGlobber(doneDirFs, done, + DONE_BEFORE_SERIAL_TAIL, null)); + + // fild old directories + for (int i = 0; i < datedDirectories.length; ++i) { + String thisDir = datedDirectories[i].toString(); + Matcher pathMatcher = historyCleanerParseDirectory.matcher(thisDir); + + if (pathMatcher.matches()) { + long dirTime = directoryTime(pathMatcher.group(1), + pathMatcher.group(2), + pathMatcher.group(3), + pathMatcher.group(4)); + + if (DEBUG_MODE) { + System.err.println("HistoryCleaner.run just parsed " + thisDir + + " as year/month/day/hour = " + + pathMatcher.group(1) + + "/" + pathMatcher.group(2) + + "/" + pathMatcher.group(3) + + "/" + pathMatcher.group(4)); + } + + if (dirTime < now - DIRECTORY_LIFE_IN_MS) { + if (DEBUG_MODE) { + Calendar then = Calendar.getInstance(); + then.setTimeInMillis(dirTime); + Calendar nnow = Calendar.getInstance(); + nnow.setTimeInMillis(now); + + System.err.println("HistoryCleaner.run directory: " + thisDir + + " because its time is " + then + + " but it's now " + nnow); + System.err.println("then = " + dirTime); + System.err.println("now = " + now); + } + + // remove every file in the directory and save the name + // so we can remove it from jobHistoryFileMap + Path[] deletees + = FileUtil.stat2Paths(localGlobber(doneDirFs, + datedDirectories[i], + "/*", // individual files + null)); + + for (int j = 0; j < deletees.length; ++j) { + + if (DEBUG_MODE && !printedOneDeletee) { + System.err.println("HistoryCleaner.run deletee: " + deletees[j].toString()); + printedOneDeletee = true; + } + + LOG.info("Deleting old history file : " + deletees[j]); + + doneDirFs.delete(deletees[j]); + deletedPathnames.add(deletees[j].toString()); + } + + synchronized (existingDoneSubdirs) { + if (!existingDoneSubdirs.contains(datedDirectories[i])) + { + LOG.warn("JobHistory: existingDoneSubdirs doesn't contain " + + datedDirectories[i] + ", but should."); + } + doneDirFs.delete(datedDirectories[i], true); + existingDoneSubdirs.remove(datedDirectories[i]); + } } } } + //walking over the map to purge entries from jobHistoryFileMap synchronized (jobHistoryFileMap) { Iterator> it = jobHistoryFileMap.entrySet().iterator(); while (it.hasNext()) { MovedFileInfo info = it.next().getValue(); - if (now - info.timestamp > maxAgeOfHistoryFiles) { + if (deletedPathnames.contains(info.historyFile)) { it.remove(); - } else { - //since entries are in sorted timestamp order, no more entries - //are required to be checked - break; } } } Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Tue Mar 8 05:53:05 2011 @@ -251,10 +251,15 @@ public class TestJobHistory extends Test */ private static Path getPathForConf(Path path, Path dir) { String parts[] = path.getName().split("_"); - //TODO this is a hack :( - // jobtracker-hostname_jobtracker-identifier_ + Path parent = path.getParent(); + Path ancestor = parent; + for (int i = 0; i < 4; ++i) { // serial #, 3 laysers of date + ancestor = ancestor.getParent(); + } + String jobtrackerID = ancestor.getName(); String id = parts[0] + "_" + parts[1] + "_" + parts[2]; - return new Path(dir, id + "_conf.xml"); + String jobUniqueString = jobtrackerID + id; + return new Path(parent, jobUniqueString + "_conf.xml"); } /** @@ -279,13 +284,14 @@ public class TestJobHistory extends Test * @param id job id * @param conf job conf */ - public static void validateJobHistoryFileFormat(JobHistory jobHistory, - JobID id, JobConf conf, - String status, boolean splitsCanBeEmpty) throws IOException { + public static void validateJobHistoryFileFormat + (JobTracker jt, JobHistory jobHistory, JobID id, JobConf conf, + String status, boolean splitsCanBeEmpty) + throws IOException { // Get the history file name Path dir = jobHistory.getCompletedJobHistoryLocation(); - String logFileName = getDoneFile(jobHistory, conf, id, dir); + String logFileName = getDoneFile(jt, conf, id, dir); // Framework history log file location Path logFile = new Path(dir, logFileName); @@ -565,11 +571,12 @@ public class TestJobHistory extends Test RunningJob job, JobConf conf) throws IOException { JobID id = job.getID(); - JobHistory jobHistory = - mr.getJobTrackerRunner().getJobTracker().getJobHistory(); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobHistory jobHistory = jt.getJobHistory(); + Path doneDir = jobHistory.getCompletedJobHistoryLocation(); // Get the history file name - String logFileName = getDoneFile(jobHistory, conf, id, doneDir); + String logFileName = getDoneFile(jt, conf, id, doneDir); // Framework history log file location Path logFile = new Path(doneDir, logFileName); @@ -646,12 +653,14 @@ public class TestJobHistory extends Test assertEquals("Files in logDir did not move to DONE folder", 0, logDirFs.listStatus(logDirPath).length); - JobHistory jobHistory = - mr.getJobTrackerRunner().getJobTracker().getJobHistory(); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobHistory jobHistory = jt.getJobHistory(); + Path doneDir = jobHistory.getCompletedJobHistoryLocation(); - assertEquals("Files in DONE dir not correct", - 2, doneDir.getFileSystem(conf).listStatus(doneDir).length); + FileStatus[] movedFiles = jobHistory.getAllHistoryConfFiles(); + + assertEquals("Files in DONE dir not correct", 2, movedFiles.length); // run the TCs conf = mr.createJobConf(); @@ -676,31 +685,28 @@ public class TestJobHistory extends Test assertEquals("History DONE folder not correct", doneFolder, doneDir.getName()); JobID id = job.getID(); - String logFileName = getDoneFile(jobHistory, conf, id, doneDir); + String logFileName = getDoneFile(jt, conf, id, doneDir); // Framework history log file location Path logFile = new Path(doneDir, logFileName); FileSystem fileSys = logFile.getFileSystem(conf); - - Cluster cluster = new Cluster(conf); - assertEquals("Client returned wrong history url", logFile.toString(), - cluster.getJobHistoryUrl(id)); // Check if the history file exists assertTrue("History file does not exist", fileSys.exists(logFile)); // check if the corresponding conf file exists - Path confFile = getPathForConf(logFile, doneDir); + String confname = jobHistory.getConfFilePath(id); + Path confFile = new Path(confname); assertTrue("Config for completed jobs doesnt exist", fileSys.exists(confFile)); - // check if the file exists in a done folder - assertTrue("Completed job config doesnt exist in the done folder", - doneDir.getName().equals(confFile.getParent().getName())); - - // check if the file exists in a done folder - assertTrue("Completed jobs doesnt exist in the done folder", - doneDir.getName().equals(logFile.getParent().getName())); + // check if the conf and log files are in the same directory + assertTrue("config file and log file aren't in the same directory", + confFile.getParent().equals(logFile.getParent())); + + // check if the file exists under the done folder + assertTrue("Completed job doesnt exist under done folder", + logFile.toString().startsWith(doneDir.toString())); // check if the job file is removed from the history location @@ -714,7 +720,7 @@ public class TestJobHistory extends Test assertFalse("Config for completed jobs not deleted from running folder", fileSys.exists(runningJobConfFilename)); - validateJobHistoryFileFormat(jobHistory, + validateJobHistoryFileFormat(jt, jobHistory, job.getID(), conf, "SUCCEEDED", false); validateJobHistoryFileContent(mr, job, conf); @@ -771,33 +777,39 @@ public class TestJobHistory extends Test // Run a job that will be succeeded and validate its history file RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); - JobHistory jobHistory = - mr.getJobTrackerRunner().getJobTracker().getJobHistory(); + + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobHistory jobHistory = jt.getJobHistory(); + Path doneDir = jobHistory.getCompletedJobHistoryLocation(); assertEquals("History DONE folder not correct", doneFolder, doneDir.toString()); JobID id = job.getID(); - String logFileName = getDoneFile(jobHistory, conf, id, doneDir); + String logFileName = getDoneFile(jt, conf, id, doneDir); // Framework history log file location Path logFile = new Path(doneDir, logFileName); + Path logDir = logFile.getParent(); FileSystem fileSys = logFile.getFileSystem(conf); // Check if the history file exists assertTrue("History file does not exist", fileSys.exists(logFile)); // check if the corresponding conf file exists - Path confFile = getPathForConf(logFile, doneDir); + Path confFile = new Path(jobHistory.getConfFilePath(id)); assertTrue("Config for completed jobs doesnt exist", fileSys.exists(confFile)); // check if the conf file exists in a done folder assertTrue("Completed job config doesnt exist in the done folder", - doneDir.getName().equals(confFile.getParent().getName())); + logDir.getName().equals(confFile.getParent().getName())); // check if the file exists in a done folder assertTrue("Completed jobs doesnt exist in the done folder", - doneDir.getName().equals(logFile.getParent().getName())); + logDir.getName().equals(logFile.getParent().getName())); + + assertTrue("The log file dir is not under the done dir", + logDir.toString().startsWith(doneDir.toString())); // check if the job file is removed from the history location Path runningJobsHistoryFolder = logFile.getParent().getParent(); @@ -810,12 +822,11 @@ public class TestJobHistory extends Test assertFalse("Config for completed jobs not deleted from running folder", fileSys.exists(runningJobConfFilename)); - validateJobHistoryFileFormat(jobHistory, job.getID(), conf, + validateJobHistoryFileFormat(jt, jobHistory, job.getID(), conf, "SUCCEEDED", false); validateJobHistoryFileContent(mr, job, conf); // get the job conf filename - JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); String name = jt.getLocalJobFilePath(job.getID()); File file = new File(name); @@ -834,16 +845,22 @@ public class TestJobHistory extends Test //Returns the file in the done folder //Waits for sometime to get the file moved to done - private static String getDoneFile(JobHistory jobHistory, - JobConf conf, JobID id, - Path doneDir) throws IOException { + private static String getDoneFile + (JobTracker jt, JobConf conf, JobID id, Path doneDir) + throws IOException { + JobHistory jobHistory = jt.getJobHistory(); + String name = null; String user = UserGroupInformation.getCurrentUser().getUserName(); + for (int i = 0; name == null && i < 20; i++) { - Path path = JobHistory.getJobHistoryFile( - jobHistory.getCompletedJobHistoryLocation(), id, user); - if (path.getFileSystem(conf).exists(path)) { - name = path.toString(); + String pathname = jobHistory.getHistoryFilePath(id); + + if (pathname != null) { + Path path = new Path(pathname); + if (path.getFileSystem(conf).exists(path)) { + name = path.toString(); + } } UtilsForTests.waitFor(1000); } @@ -870,12 +887,14 @@ public class TestJobHistory extends Test * @param id job id * @param conf job conf */ - private static void validateJobHistoryJobStatus(JobHistory jobHistory, - JobID id, JobConf conf, String status) throws IOException { + private static void validateJobHistoryJobStatus + (JobTracker jt, JobHistory jobHistory, + JobID id, JobConf conf, String status) + throws IOException { // Get the history file name Path doneDir = jobHistory.getCompletedJobHistoryLocation(); - String logFileName = getDoneFile(jobHistory, conf, id, doneDir); + String logFileName = getDoneFile(jt, conf, id, doneDir); // Framework history log file location Path logFile = new Path(doneDir, logFileName); @@ -891,8 +910,7 @@ public class TestJobHistory extends Test JobHistoryParser parser = new JobHistoryParser(fileSys, logFile.toUri().getPath()); - JobHistoryParser.JobInfo jobInfo = parser.parse(); - + JobHistoryParser.JobInfo jobInfo = parser.parse(); assertTrue("Job Status read from job history file is not the expected" + " status", status.equals(jobInfo.getJobStatus())); @@ -918,22 +936,24 @@ public class TestJobHistory extends Test // Run a job that will be succeeded and validate its job status // existing in history file RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); + + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); - JobHistory jobHistory = - mr.getJobTrackerRunner().getJobTracker().getJobHistory(); - validateJobHistoryJobStatus(jobHistory, job.getID(), conf, + JobHistory jobHistory = jt.getJobHistory(); + + validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf, JobStatus.getJobRunState(JobStatus.SUCCEEDED)); // Run a job that will be failed and validate its job status // existing in history file job = UtilsForTests.runJobFail(conf, inDir, outDir); - validateJobHistoryJobStatus(jobHistory, job.getID(), conf, + validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf, JobStatus.getJobRunState(JobStatus.FAILED)); // Run a job that will be killed and validate its job status // existing in history file job = UtilsForTests.runJobKill(conf, inDir, outDir); - validateJobHistoryJobStatus(jobHistory, job.getID(), conf, + validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf, JobStatus.getJobRunState(JobStatus.KILLED)); } finally { Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java Tue Mar 8 05:53:05 2011 @@ -109,7 +109,7 @@ public class TestJobHistoryParsing exte assertFalse("Writing an event after closing event writer is not handled", caughtException); - String historyFileName = jobId.toString() + "_" + username; + String historyFileName = jobId.toString(); Path historyFilePath = new Path (historyDir.toString(), historyFileName); Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Tue Mar 8 05:53:05 2011 @@ -295,11 +295,11 @@ public class TestSeveral extends TestCas public Void run() throws IOException { verifyOutput(outDir.getFileSystem(conf), outDir); + JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker(); //TestJobHistory - TestJobHistory.validateJobHistoryFileFormat( - mrCluster.getJobTrackerRunner().getJobTracker().getJobHistory(), - jobId, conf, "SUCCEEDED", false); + TestJobHistory.validateJobHistoryFileFormat + (jt, jt.getJobHistory(), jobId, conf, "SUCCEEDED", false); TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf); Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Mar 8 05:53:05 2011 @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; @@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.TaskT import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistory; +import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryJobRecord; +import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; @@ -266,7 +269,7 @@ public class TestRumenJobTraces { .makeQualified(lfs.getUri(), lfs.getWorkingDirectory()); // Check if jobhistory filename are detected properly - Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid, user); + Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid); JobID extractedJID = JobID.forName(TraceBuilder.extractJobID(jhFilename.getName())); assertEquals("TraceBuilder failed to parse the current JH filename", @@ -366,6 +369,9 @@ public class TestRumenJobTraces { conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1); MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null, new JobConf(conf)); + JobTracker tracker = mrCluster.getJobTrackerRunner().getJobTracker(); + JobHistory history = tracker.getJobHistory(); + // run a job Path inDir = new Path(tempDir, "input"); @@ -395,16 +401,20 @@ public class TestRumenJobTraces { Path jhPath = new Path(mrCluster.getJobTrackerRunner().getJobTracker() .getJobHistoryDir()); - Path inputPath = JobHistory.getJobHistoryFile(jhPath, id, user); + Path inputPath = null; // wait for 10 secs for the jobhistory file to move into the done folder for (int i = 0; i < 100; ++i) { - if (lfs.exists(inputPath)) { + JobHistoryRecordRetriever retriever + = history.getMatchingJobs(null, "", null, id.toString()); + if (retriever.hasNext()) { + inputPath = retriever.next().getPath(); break; } - TimeUnit.MILLISECONDS.wait(100); + TimeUnit.MILLISECONDS.sleep(100); } - assertTrue("Missing job history file", lfs.exists(inputPath)); + assertTrue("Missing job history file", + inputPath != null && lfs.exists(inputPath)); ris = getRewindableInputStream(inputPath, conf); Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Tue Mar 8 05:53:05 2011 @@ -172,13 +172,11 @@ public class TraceBuilder extends Config * [especially for .crc files] we return null. */ static String extractJobID(String fileName) { + String pre21JobID + = applyParser(fileName, + Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX); String jobId = applyParser(fileName, JobHistory.JOBHISTORY_FILENAME_REGEX); - if (jobId == null) { - // check if its a pre21 jobhistory file - jobId = applyParser(fileName, - Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX); - } - return jobId; + return jobId == null ? pre21JobID : jobId; } static boolean isJobConfXml(String fileName, InputStream input) { Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp?rev=1079184&r1=1079183&r2=1079184&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp Tue Mar 8 05:53:05 2011 @@ -21,6 +21,8 @@ contentType="text/html; charset=UTF-8" import="java.io.*" import="java.util.*" + import="java.util.regex.Matcher" + import="java.util.regex.Pattern" import="java.net.URLEncoder" import="org.apache.hadoop.mapred.*" import="org.apache.hadoop.util.*" @@ -30,6 +32,8 @@ import="org.apache.hadoop.http.HtmlQuoting" import="org.apache.hadoop.mapred.*" import="org.apache.hadoop.mapreduce.jobhistory.*" + import="org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryJobRecord" + import="org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever" %> <%! private static final long serialVersionUID = 1L; @@ -70,55 +74,68 @@ window.location.href = url; History Viewer
<% + //{{ // this is here to make indentation work, and must be commented out final String search = (request.getParameter("search") == null) ? "" : request.getParameter("search"); - String parts[] = search.split(":"); + String soughtDate = ""; + String soughtJobName = ""; + String soughtJobid = ""; - final String user = (parts.length >= 1) - ? parts[0].toLowerCase() + // soughtUser : jobid ; jobname ! date + + String splitDate[] = search.split("!"); + + final String DATE_PATTERN = "([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])?[0-9][0-9])"; + + if (splitDate.length >= 2) { + soughtDate = splitDate[1]; + } + + String[] splitJobName = splitDate[0].split(";"); + + if (splitJobName.length >= 2) { + soughtJobName = splitJobName[1]; + } + + String[] splitJobid = splitJobName[0].split(":"); + + if (splitJobid.length >= 2) { + soughtJobid = splitJobid[1]; + } + + final String soughtUser = (splitJobid.length >= 1) + ? splitJobid[0].toLowerCase() : ""; - final String jobid = (parts.length >= 2) - ? parts[1].toLowerCase() - : ""; - final String rawUser = HtmlQuoting.unquoteHtmlChars(user); - final String rawJobid = HtmlQuoting.unquoteHtmlChars(jobid); - - PathFilter jobLogFileFilter = new PathFilter() { - private boolean matchUser(String fileName) { - // return true if - // - user is not specified - // - user matches - return "".equals(rawUser) || rawUser.equals(fileName.split("_")[3]); - } - private boolean matchJobId(String fileName) { - // return true if - // - jobid is not specified - // - jobid matches - String[] jobDetails = fileName.split("_"); - String actualId = jobDetails[0] + "_" +jobDetails[1] + "_" + jobDetails[2] ; - return "".equals(rawJobid) || jobid.equalsIgnoreCase(actualId); + if (soughtDate.length() != 0) { + Pattern p = Pattern.compile(DATE_PATTERN); + Matcher m = p.matcher(soughtDate); + if (!m.matches()) { + soughtDate = ""; } + } - public boolean accept(Path path) { - return (!(path.getName().endsWith(".xml") || - path.getName().endsWith(JobHistory.OLD_SUFFIX)) && - matchUser(path.getName()) && matchJobId(path.getName())); - } - }; + JobHistory jobHistory = (JobHistory) application.getAttribute("jobHistoryHistory"); + String soughtDates[] = new String[1]; + soughtDates[0] = soughtDate; + + JobHistoryRecordRetriever retriever + = jobHistory.getMatchingJobs + (soughtUser, soughtJobName, soughtDates, soughtJobid); - FileSystem fs = (FileSystem) application.getAttribute("fileSys"); - String historyLogDir = (String) application.getAttribute("historyLogDir"); - if (fs == null) { - out.println("Null file system. May be namenode is in safemode!"); - return; - } - Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(new Path(historyLogDir), - jobLogFileFilter)); - out.println(""); - if (null == jobFiles || jobFiles.length == 0) { + JobHistoryJobRecord[] records + = new JobHistoryJobRecord[retriever.numberMatches()]; + + int recordsIndex = 0; + + while (retriever.hasNext()) { + records[recordsIndex++] = retriever.next(); + } + + out.println(""); + if (records.length == 0) { out.println("No files found!"); return ; } @@ -132,15 +149,15 @@ window.location.href = url; int size = 100; // if show-all is requested or jobfiles < size(100) - if (pageno == -1 || size > jobFiles.length) { - size = jobFiles.length; + if (pageno == -1 || size > records.length) { + size = records.length; } if (pageno == -1) { // special case 'show all' pageno = 1; } - int maxPageNo = (int)Math.ceil((float)jobFiles.length / size); + int maxPageNo = (records.length + size - 1) / size; // check and fix pageno if (pageno < 1 || pageno > maxPageNo) { @@ -152,15 +169,15 @@ window.location.href = url; if (pageno == maxPageNo) { // find the number of files to be shown on the last page int startOnLast = ((pageno - 1) * size) + 1; - length = jobFiles.length - startOnLast + 1; + length = records.length - startOnLast + 1; } // Display the search box - out.println("
Filter (username:jobid) "); // heading + out.println(" Filter ([username][:jobid][;jobname-key][!MM/DD/YYYY]) "); // heading out.println(""); // search box out.println("
"); - out.println("Example: 'smith' will display jobs submitted by user 'smith'. "); - out.println("Job Ids need to be prefixed with a colon(:) For example, :job_200908311030_0001 will display the job with that id. "); // example + out.println("Example: smith will display jobs submitted by user 'smith'. "); + out.println("Job Ids need to be prefixed with a colon(:) For example, :job_200908311030_0001 will display the job with that id. You may search for parts of job names. Job name search keys need to be prefixed by a semicolon (;). A filter ;budget will find jobs named \"budget calculation\" or \"fussbudget job\". You may restrict results to jobs that finished on a specific day. Date criteria are MM/DD/YYYYY and are prefixed with an exclamation point (!). You may specify multiple criteria. We only display jobs that satisfy all criteria."); // example out.println("
"); //Show the status @@ -171,12 +188,15 @@ window.location.href = url; out.println("Available Jobs in History "); // display the number of jobs, start index, end index - out.println("( Displaying " + length + " jobs from " + start + " to " + (start + length - 1) + " out of " + jobFiles.length + " jobs"); - if (!"".equals(user)) { - out.println(" for user " + HtmlQuoting.quoteHtmlChars(user) + ""); // show the user if present + out.println("( Displaying " + length + " jobs from " + start + " to " + (start + length - 1) + " out of " + records.length + " jobs"); + if (!"".equals(soughtUser)) { + out.println(" for user " + soughtUser + ""); // show the user if present } - if (!"".equals(jobid)) { - out.println(" for jobid " + HtmlQuoting.quoteHtmlChars(jobid) + " in it."); // show the jobid keyword if present + if (!"".equals(soughtJobid)) { + out.println(" for jobid " + soughtJobid + " in it "); // show the jobid keyword if present + } + if (!"".equals(soughtDate)) { + out.println(" for date " + soughtDate + ""); // show the jobid keyword if present } out.print(")"); @@ -197,28 +217,27 @@ window.location.href = url; out.println("[last page]"); } - // sort the files on creation time. - Arrays.sort(jobFiles, new Comparator() { - public int compare(Path p1, Path p2) { - String dp1 = null; - String dp2 = null; - - dp1 = p1.getName(); - dp2 = p2.getName(); - - String[] split1 = dp1.split("_"); - String[] split2 = dp2.split("_"); + // REVERSE sort the files on creation time. + Arrays.sort(records, new Comparator() { + public int compare(JobHistoryJobRecord rec1, JobHistoryJobRecord rec2) { + String id1 = rec1.getJobIDString(true); + String id2 = rec2.getJobIDString(true); + + String[] idsplit1 = id1.split("_"); + String[] idsplit2 = id2.split("_"); - // compare job tracker start time - int res = new Date(Long.parseLong(split1[1])).compareTo( - new Date(Long.parseLong(split2[1]))); - if (res == 0) { - Long l1 = Long.parseLong(split1[2]); - res = l1.compareTo(Long.parseLong(split2[2])); - } - return res; - } - }); + // compare job tracker start time + Long jtTime2 = Long.parseLong(idsplit2[1]); + // comparison sense is reversed + int res = jtTime2.compareTo(Long.parseLong(idsplit1[1])); + if (res == 0) { + // comparison sense is reversed + Long sn2 = Long.parseLong(idsplit2[2]); + res = sn2.compareTo(Long.parseLong(idsplit1[2])); + } + return res; + } + }); out.println("

"); @@ -227,15 +246,21 @@ window.location.href = url; out.print(""); out.print(""); - out.print( "") ; + out.print( ""); + out.print(""); + out.print("") ; + out.print("") ; out.print(""); Set displayedJobs = new HashSet(); for (int i = start - 1; i < start + length - 1; ++i) { - Path jobFile = jobFiles[i]; + JobHistoryJobRecord record = records[i]; - String jobId = JobHistory.getJobIDFromHistoryFilePath(jobFile).toString(); - String userName = JobHistory.getUserFromHistoryFilePath(jobFile); + String jobId = record.getJobIDString(); + String userName = record.getUserName(); + long submitTime = record.getSubmitTime(); + String jobName = record.getJobName(); + Path logPath = record.getPath(); // Check if the job is already displayed. There can be multiple job // history files for jobs that have restarted @@ -248,8 +273,7 @@ window.location.href = url; %>
<% - printJob(jobId, userName, new Path(jobFile.getParent(), jobFile), - out) ; + printJob(submitTime, jobId, userName, logPath, jobName, out) ; %>
<% @@ -260,17 +284,39 @@ window.location.href = url; printNavigation(pageno, size, maxPageNo, search, out); %> <%! - private void printJob(String jobId, - String user, Path logFile, JspWriter out) + private void printJob(long timestamp, String jobId, + String user, Path logFile, String jobName, JspWriter out) throws IOException { out.print("
"); + out.print(""); out.print(""); - out.print(""); + "\">" + jobId + ""); + out.print(""); + out.print(""); out.print(""); } + + // I tolerate this code because I expect a low number of + // occurrences in a relatively short string + private static String replaceStringInstances + (String replacee, String old, String replacement) { + int index = replacee.indexOf(old); + + while (index > 0) { + replacee = (replacee.substring(0, index) + + replacement + + replaceStringInstances + (replacee.substring(index + old.length()), + old, replacement)); + + index = replacee.indexOf(old); + } + + return replacee; + } + private void printNavigation(int pageno, int size, int max, String search, JspWriter out) throws IOException { int numIndexToShow = 5; // num indexes to show on either side
Job IdUserJob submit timeJob IdUserJob Name
" + new Date(timestamp) + "" + "" + HtmlQuoting.quoteHtmlChars(jobId) + "" + HtmlQuoting.quoteHtmlChars(user) + "" + user + "" + jobName + "