Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 92479 invoked from network); 11 Aug 2009 17:43:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Aug 2009 17:43:43 -0000 Received: (qmail 29684 invoked by uid 500); 11 Aug 2009 17:43:50 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 29656 invoked by uid 500); 11 Aug 2009 17:43:50 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 29646 invoked by uid 99); 11 Aug 2009 17:43:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2009 17:43:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2009 17:43:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DFBCF238888D; Tue, 11 Aug 2009 17:43:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r803231 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/java/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/ Date: Tue, 11 Aug 2009 17:43:14 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090811174315.DFBCF238888D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sharad Date: Tue Aug 11 17:43:14 2009 New Revision: 803231 URL: http://svn.apache.org/viewvc?rev=803231&view=rev Log: MAPREDUCE-817. Add a cache for retired jobs with minimal job info and provide a way to access history file url. Removed: hadoop/mapreduce/trunk/src/webapps/job/loadhistory.jsp Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/mapreduce/trunk/src/java/mapred-default.xml hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java hadoop/mapreduce/trunk/src/webapps/job/analysejobhistory.jsp hadoop/mapreduce/trunk/src/webapps/job/jobconf_history.jsp hadoop/mapreduce/trunk/src/webapps/job/jobdetailshistory.jsp hadoop/mapreduce/trunk/src/webapps/job/jobtaskshistory.jsp hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp hadoop/mapreduce/trunk/src/webapps/job/taskdetailshistory.jsp Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 11 17:43:14 2009 @@ -17,6 +17,9 @@ MAPREDUCE-766. Enhanced list-blacklisted-trackers to display reasons for blacklisting a node. (Sreekanth Ramakrishnan via yhemanth) + MAPREDUCE-817. Add a cache for retired jobs with minimal job info and + provide a way to access history file url. (sharad) + NEW FEATURES MAPREDUCE-546. Provide sample fair scheduler config file in conf/ and use Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Aug 11 17:43:14 2009 @@ -172,7 +172,9 @@ super(jId, jobConf, null); this.taskTrackerManager = taskTrackerManager; this.startTime = System.currentTimeMillis(); - this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP); + this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP, + jobConf.getUser(), + jobConf.getJobName(), "", ""); this.status.setJobPriority(JobPriority.NORMAL); this.status.setStartTime(startTime); if (null == jobConf.getQueueName()) { Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original) +++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Tue Aug 11 17:43:14 2009 @@ -348,6 +348,21 @@ + mapred.job.tracker.retiredjobs.cache.size + 1000 + The number of retired job status to keep in the cache. + + + + + mapred.job.tracker.jobhistory.lru.cache.size + 5 + The number of job history files loaded in memory. The jobs are + loaded when they are first accessed. The cache is cleared based on LRU. + + + + mapred.jobtracker.instrumentation org.apache.hadoop.mapred.JobTrackerMetricsInst Expert: The instrumentation class to associate with each JobTracker. Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java Tue Aug 11 17:43:14 2009 @@ -18,21 +18,37 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.util.Iterator; +import java.net.URLEncoder; import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobHistory.JobInfo; import org.apache.hadoop.util.ServletUtil; +import org.apache.hadoop.util.StringUtils; class JSPUtil { private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions"; public static final Configuration conf = new Configuration(); + //LRU based cache + private static final Map jobHistoryCache = + new LinkedHashMap(); + + private static final int CACHE_SIZE = + conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5); + + private static final Log LOG = LogFactory.getLog(JSPUtil.class); /** * Method used to process the request from the job page based on the * request which it has received. For example like changing priority. @@ -178,7 +194,94 @@ return sb.toString(); } + @SuppressWarnings("unchecked") + public static String generateRetiredJobTable(JobTracker tracker, int rowId) + throws IOException { + + StringBuffer sb = new StringBuffer(); + sb.append("\n"); + + Iterator iterator = + tracker.retireJobs.getAll().descendingIterator(); + if (!iterator.hasNext()) { + sb.append("\n"); + } else { + sb.append(""); + + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append(""); + sb.append("\n"); + for (int i = 0; i < 100 && iterator.hasNext(); i++) { + JobStatus status = iterator.next(); + sb.append(""); + sb.append( + "" + + + "" + + "" + + "" + + "" + + "" + + "" + + + "" + + + "" + + + "" + + + "\n"); + rowId++; + } + } + sb.append("
none" + + "
JobidPriorityUserNameStateStart TimeFinish TimeMap % CompleteReduce % CompleteJob Scheduling Information
" + + "" + + status.getJobId() + "" + + status.getJobPriority().toString() + "" + status.getUsername() + "" + status.getJobName() + "" + JobStatus.getJobRunState(status.getRunState()) + "" + new Date(status.getStartTime()) + "" + new Date(status.getFinishTime()) + "" + StringUtils.formatPercent(status.mapProgress(), 2) + + ServletUtil.percentageGraph(status.mapProgress() * 100, 80) + + "" + StringUtils.formatPercent(status.reduceProgress(), 2) + + ServletUtil.percentageGraph(status.reduceProgress() * 100, 80) + + "" + status.getSchedulingInfo() + "
\n"); + return sb.toString(); + } + static final boolean privateActionsAllowed() { return conf.getBoolean(PRIVATE_ACTIONS_KEY, false); } + + static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs) + throws IOException { + String jobid = request.getParameter("jobid"); + String logFile = request.getParameter("logFile"); + synchronized(jobHistoryCache) { + JobInfo jobInfo = jobHistoryCache.remove(jobid); + if (jobInfo == null) { + jobInfo = new JobHistory.JobInfo(jobid); + LOG.info("Loading Job History file "+jobid + ". Cache size is " + + jobHistoryCache.size()); + DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ; + } + jobHistoryCache.put(jobid, jobInfo); + if (jobHistoryCache.size() > CACHE_SIZE) { + Iterator> it = + jobHistoryCache.entrySet().iterator(); + String removeJobId = it.next().getKey(); + it.remove(); + LOG.info("Job History file removed form cache "+removeJobId); + } + return jobInfo; + } + } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Aug 11 17:43:14 2009 @@ -173,7 +173,6 @@ * remote service to provide certain functionality. */ class NetworkedJob implements RunningJob { - JobProfile profile; JobStatus status; long statustime; @@ -186,7 +185,6 @@ */ public NetworkedJob(JobStatus job) throws IOException { this.status = job; - this.profile = jobSubmitClient.getJobProfile(job.getJobID()); this.statustime = System.currentTimeMillis(); } @@ -205,7 +203,10 @@ * @throws IOException */ synchronized void updateStatus() throws IOException { - this.status = jobSubmitClient.getJobStatus(profile.getJobID()); + this.status = jobSubmitClient.getJobStatus(status.getJobID()); + if (this.status == null) { + throw new IOException("Job status not available "); + } this.statustime = System.currentTimeMillis(); } @@ -213,35 +214,35 @@ * An identifier for the job */ public JobID getID() { - return profile.getJobID(); + return status.getJobID(); } /** @deprecated This method is deprecated and will be removed. Applications should * rather use {@link #getID()}.*/ @Deprecated public String getJobID() { - return profile.getJobID().toString(); + return status.getJobID().toString(); } /** * The user-specified job name */ public String getJobName() { - return profile.getJobName(); + return status.getJobName(); } /** * The name of the job file */ public String getJobFile() { - return profile.getJobFile(); + return status.getJobFile(); } /** * A URL where the job's status can be seen */ public String getTrackingURL() { - return profile.getURL().toString(); + return status.getTrackingUrl(); } /** @@ -368,11 +369,14 @@ updateStatus(); } catch (IOException e) { } - return "Job: " + profile.getJobID() + "\n" + - "file: " + profile.getJobFile() + "\n" + - "tracking URL: " + profile.getURL() + "\n" + + return "Job: " + status.getJobID() + "\n" + + "status: " + JobStatus.getJobRunState(status.getRunState()) + "\n" + + "file: " + status.getJobFile() + "\n" + + "tracking URL: " + status.getTrackingUrl() + "\n" + "map() completion: " + status.mapProgress() + "\n" + - "reduce() completion: " + status.reduceProgress(); + "reduce() completion: " + status.reduceProgress() + "\n" + + "history URL: " + status.getHistoryFile() + "\n" + + "retired: " + status.isRetired(); } /** @@ -386,6 +390,16 @@ public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { return jobSubmitClient.getTaskDiagnostics(id); } + + public String getHistoryUrl() throws IOException { + updateStatus(); + return status.getHistoryFile(); + } + + public boolean isRetired() throws IOException { + updateStatus(); + return status.isRetired(); + } } private JobSubmissionProtocol jobSubmitClient; @@ -1354,7 +1368,10 @@ } } LOG.info("Job complete: " + jobId); - job.getCounters().log(LOG); + Counters counters = job.getCounters(); + if (counters != null) { + counters.log(LOG); + } return job.isSuccessful(); } @@ -1695,7 +1712,12 @@ } else { System.out.println(); System.out.println(job); - System.out.println(job.getCounters()); + Counters counters = job.getCounters(); + if (counters != null) { + System.out.println(counters); + } else { + System.out.println("Counters not available. Job is retired."); + } exitCode = 0; } } else if (getCounter) { @@ -1704,10 +1726,16 @@ System.out.println("Could not find job " + jobid); } else { Counters counters = job.getCounters(); - Group group = counters.getGroup(counterGroupName); - Counter counter = group.getCounterForName(counterName); - System.out.println(counter.getCounter()); - exitCode = 0; + if (counters == null) { + System.out.println("Counters not available for retired job " + + jobid); + exitCode = -1; + } else { + Group group = counters.getGroup(counterGroupName); + Counter counter = group.getCounterForName(counterName); + System.out.println(counter.getCounter()); + exitCode = 0; + } } } else if (killJob) { RunningJob job = getJob(JobID.forName(jobid)); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Tue Aug 11 17:43:14 2009 @@ -127,13 +127,16 @@ private ThreadPoolExecutor executor = null; private final Configuration conf; + private final JobTracker jobTracker; // cache from job-key to files associated with it. private Map fileCache = new ConcurrentHashMap(); - JobHistoryFilesManager(Configuration conf) throws IOException { + JobHistoryFilesManager(Configuration conf, JobTracker jobTracker) + throws IOException { this.conf = conf; + this.jobTracker = jobTracker; } void start() { @@ -184,7 +187,22 @@ fileCache.remove(id); } - void moveToDone(final JobID id, final List paths) { + void moveToDone(final JobID id) { + final List paths = new ArrayList(); + final Path historyFile = fileManager.getHistoryFile(id); + if (historyFile == null) { + LOG.info("No file for job-history with " + id + " found in cache!"); + } else { + paths.add(historyFile); + } + + final Path confPath = fileManager.getConfFileWriters(id); + if (confPath == null) { + LOG.info("No file for jobconf with " + id + " found in cache!"); + } else { + paths.add(confPath); + } + executor.execute(new Runnable() { public void run() { @@ -201,6 +219,13 @@ } } + String historyFileDonePath = null; + if (historyFile != null) { + historyFileDonePath = new Path(DONE, + historyFile.getName()).toString(); + } + jobTracker.historyFileCopied(id, historyFileDonePath); + //purge the job from the cache fileManager.purgeJob(id); } catch (Throwable e) { @@ -249,12 +274,13 @@ * @param jobTrackerStartTime jobtracker's start time * @return true if intialized properly * false otherwise - * @deprecated Use {@link #init(JobConf, String, long, FileSystem)} instead. + * @deprecated Use {@link #init(JobTracker, JobConf, String, long, + * FileSystem)} instead. */ @Deprecated - public static boolean init(JobConf conf, String hostname, - long jobTrackerStartTime){ - return init(conf, hostname, jobTrackerStartTime, null); + public static boolean init(JobTracker jobTracker, JobConf conf, + String hostname, long jobTrackerStartTime){ + return init(jobTracker, conf, hostname, jobTrackerStartTime, null); } /** @@ -266,9 +292,8 @@ * @return true if intialized properly * false otherwise */ - public static boolean init(JobConf conf, String hostname, - long jobTrackerStartTime, - FileSystem fs){ + public static boolean init(JobTracker jobTracker, JobConf conf, + String hostname, long jobTrackerStartTime, FileSystem fs){ try { LOG_DIR = conf.get("hadoop.job.history.location" , "file:///" + new File( @@ -297,7 +322,7 @@ 3 * 1024 * 1024); // initialize the file manager - fileManager = new JobHistoryFilesManager(conf); + fileManager = new JobHistoryFilesManager(conf, jobTracker); } catch(IOException e) { LOG.error("Failed to initialize JobHistory log file", e); disableHistory = true; @@ -1040,25 +1065,7 @@ * This *should* be the last call to jobhistory for a given job. */ static void markCompleted(JobID id) throws IOException { - List paths = new ArrayList(); - Path path = fileManager.getHistoryFile(id); - if (path == null) { - LOG.info("No file for job-history with " + id + " found in cache!"); - } else { - paths.add(path); - } - - Path confPath = fileManager.getConfFileWriters(id); - if (confPath == null) { - LOG.info("No file for jobconf with " + id + " found in cache!"); - } else { - paths.add(confPath); - } - - //move the job files to done folder and purge the job - if (paths.size() > 0) { - fileManager.moveToDone(id, paths); - } + fileManager.moveToDone(id); } /** Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 11 17:43:14 2009 @@ -300,7 +300,9 @@ this.nonRunningReduces = new LinkedList(); this.runningReduces = new LinkedHashSet(); this.resourceEstimator = new ResourceEstimator(this); - this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); + this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, + this.profile.getUser(), this.profile.getJobName(), + this.profile.getJobFile(), ""); this.taskCompletionEvents = new ArrayList (numMapTasks + numReduceTasks + 10); @@ -330,9 +332,9 @@ String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; this.jobtracker = jobtracker; - this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); + this.startTime = System.currentTimeMillis(); - status.setStartTime(startTime); + this.localFs = jobtracker.getLocalFileSystem(); JobConf default_job_conf = new JobConf(default_conf); @@ -346,10 +348,15 @@ fs.copyToLocalFile(jobFile, localJobFile); conf = new JobConf(localJobFile); this.priority = conf.getJobPriority(); - this.status.setJobPriority(this.priority); this.profile = new JobProfile(conf.getUser(), jobid, jobFile.toString(), url, conf.getJobName(), conf.getQueueName()); + this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, + profile.getUser(), profile.getJobName(), profile.getJobFile(), + profile.getURL().toString()); + status.setStartTime(startTime); + this.status.setJobPriority(this.priority); + String jarFile = conf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new Path(jarFile), localJarFile); @@ -2630,6 +2637,7 @@ this.status.setReduceProgress(1.0f); } this.finishTime = JobTracker.getClock().getTime(); + this.status.setFinishTime(this.finishTime); LOG.info("Job " + this.status.getJobID() + " has completed successfully."); @@ -2653,11 +2661,15 @@ private synchronized void terminateJob(int jobTerminationState) { if ((status.getRunState() == JobStatus.RUNNING) || (status.getRunState() == JobStatus.PREP)) { + + this.finishTime = JobTracker.getClock().getTime(); + this.status.setMapProgress(1.0f); + this.status.setReduceProgress(1.0f); + this.status.setCleanupProgress(1.0f); + this.status.setFinishTime(this.finishTime); + if (jobTerminationState == JobStatus.FAILED) { - this.status = new JobStatus(status.getJobID(), - 1.0f, 1.0f, 1.0f, JobStatus.FAILED, - status.getJobPriority()); - this.finishTime = JobTracker.getClock().getTime(); + this.status.setRunState(JobStatus.FAILED); // Log the job summary JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false)); @@ -2667,11 +2679,8 @@ this.finishedMapTasks, this.finishedReduceTasks); } else { - this.status = new JobStatus(status.getJobID(), - 1.0f, 1.0f, 1.0f, JobStatus.KILLED, - status.getJobPriority()); - this.finishTime = JobTracker.getClock().getTime(); - + this.status.setRunState(JobStatus.KILLED); + // Log the job summary JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false)); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Tue Aug 11 17:43:14 2009 @@ -74,6 +74,14 @@ private String user; private JobPriority priority; private String schedulingInfo="NA"; + + private String jobName; + private String jobFile; + private long finishTime; + private boolean isRetired; + private String historyFile = ""; + private String trackingUrl =""; + /** */ @@ -87,11 +95,17 @@ * @param reduceProgress The progress made on the reduces * @param cleanupProgress The progress made on cleanup * @param runState The current state of the job + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. */ public JobStatus(JobID jobid, float mapProgress, float reduceProgress, - float cleanupProgress, int runState) { + float cleanupProgress, int runState, + String user, String jobName, + String jobFile, String trackingUrl) { this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, - JobPriority.NORMAL); + JobPriority.NORMAL, user, jobName, jobFile, trackingUrl); } /** @@ -100,10 +114,16 @@ * @param mapProgress The progress made on the maps * @param reduceProgress The progress made on the reduces * @param runState The current state of the job + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. */ public JobStatus(JobID jobid, float mapProgress, float reduceProgress, - int runState) { - this(jobid, mapProgress, reduceProgress, 0.0f, runState); + int runState, String user, String jobName, + String jobFile, String trackingUrl) { + this(jobid, mapProgress, reduceProgress, 0.0f, runState, user, jobName, + jobFile, trackingUrl); } /** @@ -113,11 +133,17 @@ * @param reduceProgress The progress made on the reduces * @param runState The current state of the job * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. */ public JobStatus(JobID jobid, float mapProgress, float reduceProgress, - float cleanupProgress, int runState, JobPriority jp) { + float cleanupProgress, int runState, JobPriority jp, + String user, String jobName, String jobFile, + String trackingUrl) { this(jobid, 0.0f, mapProgress, reduceProgress, - cleanupProgress, runState, jp); + cleanupProgress, runState, jp, user, jobName, jobFile, trackingUrl); } /** @@ -129,21 +155,29 @@ * @param cleanupProgress The progress made on the cleanup * @param runState The current state of the job * @param jp Priority of the job. + * @param user userid of the person who submitted the job. + * @param jobName user-specified job name. + * @param jobFile job configuration file. + * @param trackingUrl link to the web-ui for details of the job. */ public JobStatus(JobID jobid, float setupProgress, float mapProgress, float reduceProgress, float cleanupProgress, - int runState, JobPriority jp) { + int runState, JobPriority jp, String user, String jobName, + String jobFile, String trackingUrl) { this.jobid = jobid; this.setupProgress = setupProgress; this.mapProgress = mapProgress; this.reduceProgress = reduceProgress; this.cleanupProgress = cleanupProgress; this.runState = runState; - this.user = "nobody"; + this.user = user; if (jp == null) { throw new IllegalArgumentException("Job Priority cannot be null."); } priority = jp; + this.jobName = jobName; + this.jobFile = jobFile; + this.trackingUrl = trackingUrl; } /** @@ -308,6 +342,12 @@ Text.writeString(out, user); WritableUtils.writeEnum(out, priority); Text.writeString(out, schedulingInfo); + out.writeLong(finishTime); + out.writeBoolean(isRetired); + Text.writeString(out, historyFile); + Text.writeString(out, jobName); + Text.writeString(out, trackingUrl); + Text.writeString(out, jobFile); } public synchronized void readFields(DataInput in) throws IOException { @@ -321,5 +361,98 @@ this.user = Text.readString(in); this.priority = WritableUtils.readEnum(in, JobPriority.class); this.schedulingInfo = Text.readString(in); + this.finishTime = in.readLong(); + this.isRetired = in.readBoolean(); + this.historyFile = Text.readString(in); + this.jobName = Text.readString(in); + this.trackingUrl = Text.readString(in); + this.jobFile = Text.readString(in); + } + + /** + * Get the user-specified job name. + */ + public String getJobName() { + return jobName; + } + + /** + * Get the configuration file for the job. + */ + public String getJobFile() { + return jobFile; + } + + /** + * Get the link to the web-ui for details of the job. + */ + public synchronized String getTrackingUrl() { + return trackingUrl; + } + + /** + * Get the finish time of the job. + */ + public synchronized long getFinishTime() { + return finishTime; + } + + /** + * Check whether the job has retired. + */ + public synchronized boolean isRetired() { + return isRetired; + } + + /** + * @return the job history file name for a completed job. If job is not + * completed or history file not available then return null. + */ + public synchronized String getHistoryFile() { + return historyFile; + } + + /** + * Set the finish time of the job + * @param finishTime The finishTime of the job + */ + synchronized void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + /** + * Set the job history file url for a completed job + */ + synchronized void setHistoryFile(String historyFile) { + this.historyFile = historyFile; + } + + /** + * Set the link to the web-ui for details of the job. + */ + synchronized void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + } + + /** + * Set the job retire flag to true. + */ + synchronized void setRetired() { + this.isRetired = true; + } + + public String toString() { + StringBuffer buffer = new StringBuffer(); + buffer.append("job-id : " + jobid); + buffer.append("map-progress : " + mapProgress); + buffer.append("reduce-progress : " + reduceProgress); + buffer.append("cleanup-progress : " + cleanupProgress); + buffer.append("setup-progress : " + setupProgress); + buffer.append("runstate : " + runState); + buffer.append("start-time : " + startTime); + buffer.append("user-name : " + user); + buffer.append("priority : " + priority); + buffer.append("scheduling-info : " + schedulingInfo); + return buffer.toString(); } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Tue Aug 11 17:43:14 2009 @@ -65,8 +65,9 @@ * Part of HADOOP-5913. * Version 24: Modified ClusterStatus to include BlackListInfo class which * encapsulates reasons and report for blacklisted node. + * Version 25: Added fields to JobStatus for HADOOP-817. */ - public static final long versionID = 24L; + public static final long versionID = 25L; /** * Allocate a name for the job. Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Aug 11 17:43:14 2009 @@ -19,12 +19,15 @@ import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.URLEncoder; import java.net.UnknownHostException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -33,6 +36,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -417,14 +421,56 @@ } } + synchronized void historyFileCopied(JobID jobid, String historyFile) { + JobStatus status = getJobStatus(jobid); + if (status != null) { + String trackingUrl = ""; + if (historyFile != null) { + status.setHistoryFile(historyFile); + try { + trackingUrl = "http://" + getJobTrackerMachine() + ":" + + getInfoPort() + "/jobdetailshistory.jsp?jobid=" + + jobid + "&logFile=" + URLEncoder.encode(historyFile, "UTF-8"); + } catch (UnsupportedEncodingException e) { + LOG.warn("Could not create trackingUrl", e); + } + } + status.setTrackingUrl(trackingUrl); + } + } + /////////////////////////////////////////////////////// // Used to remove old finished Jobs that have been around for too long /////////////////////////////////////////////////////// class RetireJobs implements Runnable { int runCount = 0; + private final Map jobIDStatusMap = + new HashMap(); + private final LinkedList jobStatusQ = + new LinkedList(); public RetireJobs() { } + synchronized void addToCache(JobStatus status) { + status.setRetired(); + jobStatusQ.add(status); + jobIDStatusMap.put(status.getJobID(), status); + if (jobStatusQ.size() > retiredJobsCacheSize) { + JobStatus removed = jobStatusQ.remove(); + jobIDStatusMap.remove(removed.getJobID()); + LOG.info("Retired job removed from cache " + removed.getJobID()); + } + } + + synchronized JobStatus get(JobID jobId) { + return jobIDStatusMap.get(jobId); + } + + @SuppressWarnings("unchecked") + synchronized LinkedList getAll() { + return (LinkedList) jobStatusQ.clone(); + } + /** * The run method lives for the life of the JobTracker, * and removes Jobs that are not still running, but which @@ -476,6 +522,7 @@ // clean up job files from the local disk JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID()); + addToCache(job.getStatus()); } } } @@ -1723,6 +1770,7 @@ Thread expireTrackersThread = null; RetireJobs retireJobs = new RetireJobs(); Thread retireJobsThread = null; + final int retiredJobsCacheSize; ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, "expireLaunchingTasks"); @@ -1802,6 +1850,8 @@ conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000); retireJobInterval = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000); retireJobCheckInterval = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000); + retiredJobsCacheSize = + conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000); // min time before retire MIN_TIME_BEFORE_RETIRE = conf.getInt("mapred.jobtracker.retirejob.interval.min", 60000); MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100); @@ -1872,7 +1922,7 @@ tmpInfoPort == 0, conf); infoServer.setAttribute("job.tracker", this); // initialize history parameters. - boolean historyInitialized = JobHistory.init(conf, this.localMachine, + boolean historyInitialized = JobHistory.init(this, conf, this.localMachine, this.startTime); infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); @@ -2420,12 +2470,6 @@ while (userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) { JobInProgress rjob = userJobs.get(0); - - // Do not delete 'current' - // finished job just yet. - if (rjob == job) { - break; - } // do not retire jobs that finished in the very recent past. if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) { @@ -2455,6 +2499,9 @@ LOG.info("Retired job with id: '" + rjob.getProfile().getJobID() + "' of user: '" + jobUser + "'"); + // clean up job files from the local disk + JobHistory.JobInfo.cleanupJob(rjob.getProfile().getJobID()); + retireJobs.addToCache(rjob.getStatus()); } else { // Do not remove jobs that aren't complete. // Stop here, and let the next pass take @@ -3657,7 +3704,13 @@ JobInProgress job = jobs.get(jobid); if (job != null) { return job.getStatus(); - } + } else { + + JobStatus status = retireJobs.get(jobid); + if (status != null) { + return status; + } + } } return completedJobStatusStore.readJobStatus(jobid); } @@ -3796,19 +3849,19 @@ */ public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException { - + List taskDiagnosticInfo = null; JobID jobId = taskId.getJobID(); TaskID tipId = taskId.getTaskID(); JobInProgress job = jobs.get(jobId); - if (job == null) { - throw new IllegalArgumentException("Job " + jobId + " not found."); - } - TaskInProgress tip = job.getTaskInProgress(tipId); - if (tip == null) { - throw new IllegalArgumentException("TIP " + tipId + " not found."); + if (job != null) { + TaskInProgress tip = job.getTaskInProgress(tipId); + if (tip != null) { + taskDiagnosticInfo = tip.getDiagnosticInfo(taskId); + } + } - List taskDiagnosticInfo = tip.getDiagnosticInfo(taskId); - return ((taskDiagnosticInfo == null) ? null + + return ((taskDiagnosticInfo == null) ? new String[0] : taskDiagnosticInfo.toArray(new String[0])); } @@ -3877,7 +3930,10 @@ } public JobStatus[] getAllJobs() { - return getJobStatus(jobs.values(),false); + List list = new ArrayList(); + list.addAll(Arrays.asList(getJobStatus(jobs.values(),false))); + list.addAll(retireJobs.getAll()); + return list.toArray(new JobStatus[list.size()]); } /** Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Aug 11 17:43:14 2009 @@ -120,7 +120,9 @@ this.job = new JobConf(localFile); profile = new JobProfile(job.getUser(), id, file.toString(), "http://localhost:8080/", job.getJobName()); - status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING); + status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, + profile.getUser(), profile.getJobName(), profile.getJobFile(), + profile.getURL().toString()); jobs.put(id, this); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Tue Aug 11 17:43:14 2009 @@ -189,4 +189,22 @@ * @throws IOException */ public String[] getTaskDiagnostics(TaskAttemptID taskid) throws IOException; + + /** + * Get the url where history file is archived. Returns empty string if + * history file is not available yet. + * + * @return the url where history file is archived + * @throws IOException + */ + public String getHistoryUrl() throws IOException; + + /** + * Check whether the job has been removed from JobTracker memory and retired. + * On retire, the job history file is copied to a location known by + * {@link #getHistoryUrl()} + * @return true if the job retired, else false. + * @throws IOException + */ + public boolean isRetired() throws IOException; } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java Tue Aug 11 17:43:14 2009 @@ -59,7 +59,7 @@ JobConf conf = new JobConf(); conf.set("hadoop.job.history.location", historyDir.toString()); FileSystem fs = FileSystem.getLocal(new JobConf()); - JobHistory.init(conf, "localhost", 1234, fs); + JobHistory.init(null, conf, "localhost", 1234, fs); Path historyLog = new Path(historyDir, "testlog"); PrintWriter out = new PrintWriter(fs.create(historyLog)); historyWriter.add(out); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Aug 11 17:43:14 2009 @@ -50,7 +50,9 @@ super(new JobID("test", ++jobCounter), jobConf, null); this.taskTrackerManager = taskTrackerManager; this.startTime = System.currentTimeMillis(); - this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP); + this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, + jobConf.getUser(), + jobConf.getJobName(), "", ""); this.status.setJobPriority(JobPriority.NORMAL); this.status.setStartTime(startTime); } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Tue Aug 11 17:43:14 2009 @@ -18,9 +18,10 @@ package org.apache.hadoop.mapred; +import java.io.File; +import java.io.IOException; + import junit.framework.TestCase; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.UtilsForTests.FakeClock; @@ -36,26 +37,31 @@ "job-expiry-testing"); private static final Log LOG = LogFactory.getLog(TestJobRetire.class); + private void testJobConfFile(JobID id, boolean exists) { + // get the job conf filename + String name = JobHistory.JobInfo.getLocalJobFilePath(id); + File file = new File(name); + + assertEquals("JobConf file check failed", exists, file.exists()); + } + /** Test if the job after completion waits for atleast * mapred.jobtracker.retirejob.interval.min amount of time. */ public void testMinIntervalBeforeRetire() throws Exception { - MiniDFSCluster dfs = null; MiniMRCluster mr = null; int min = 10000; int max = 5000; try { FakeClock clock = new FakeClock(); JobConf conf = new JobConf(); - dfs = new MiniDFSCluster(conf, 1, true, null); - FileSystem fileSys = dfs.getFileSystem(); - String namenode = fileSys.getUri().toString(); conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec conf.setInt("mapred.jobtracker.retirejob.interval.min", min); //10 secs conf.setInt("mapred.jobtracker.retirejob.interval", max); // 5 secs conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false); - mr = new MiniMRCluster(0, 0, 1, namenode, 1, null, null, null, conf, 0, + conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0); + mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, clock); JobConf jobConf = mr.createJobConf(); JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); @@ -82,6 +88,9 @@ assertNotNull(jc.getJob(id)); + //check that the job is not retired + assertFalse(jc.getJob(id).isRetired()); + // snapshot expiry thread count snapshot = jobtracker.retireJobs.runCount; clock.advance(min - max); // adv to expiry min time @@ -93,9 +102,11 @@ // check if the job is missing assertNull(jc.getJob(id)); + assertNull(jobtracker.getJob(id)); + + testJobConfFile(id, false); } finally { if (mr != null) { mr.shutdown();} - if (dfs != null) { dfs.shutdown();} } } @@ -103,22 +114,19 @@ * mapred.jobtracker.retirejob.interval amount after the time. */ public void testJobRetire() throws Exception { - MiniDFSCluster dfs = null; MiniMRCluster mr = null; int min = 10000; int max = 20000; try { FakeClock clock = new FakeClock(); JobConf conf = new JobConf(); - dfs = new MiniDFSCluster(conf, 1, true, null); - FileSystem fileSys = dfs.getFileSystem(); - String namenode = fileSys.getUri().toString(); conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false); - mr = new MiniMRCluster(0, 0, 1, namenode, 1, null, null, null, conf, 0, + conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0); + mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, clock); JobConf jobConf = mr.createJobConf(); JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); @@ -141,12 +149,169 @@ // wait for the thread to run UtilsForTests.waitFor(1000); } - + // check if the job is missing assertNull(jc.getJob(id)); + assertNull(jobtracker.getJob(id)); + + testJobConfFile(id, false); } finally { if (mr != null) { mr.shutdown();} - if (dfs != null) { dfs.shutdown();} } } + + /** Test if the job after gets expired after + * mapred.jobtracker.completeuserjobs.maximum jobs. + */ + public void testMaxJobRetire() throws Exception { + MiniMRCluster mr = null; + int min = 10000; + int max = 20000; + try { + FakeClock clock = new FakeClock(); + JobConf conf = new JobConf(); + + conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec + conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs + conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs + conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false); + conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 1); + conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0); + mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, + clock); + JobConf jobConf = mr.createJobConf(); + JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); + + Path inDir = new Path(testDir, "input2.1"); + Path outDir = new Path(testDir, "output2.1"); + RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0); + rj.waitForCompletion(); + JobID id = rj.getID(); + JobClient jc = new JobClient(jobConf); + + // check if the job is successful + assertTrue(rj.isSuccessful()); + + clock.advance(min + 1); // adv to expiry min time + + inDir = new Path(testDir, "input2.2"); + outDir = new Path(testDir, "output2.2"); + RunningJob rj2 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0); + rj2.waitForCompletion(); + JobID id2 = rj2.getID(); + + // check if the job#1 is missing + assertNull(jc.getJob(id)); + assertNull("Job still not missing from jobtracker", jobtracker.getJob(id)); + + // check if the job#2 exists + assertNotNull(jc.getJob(id2)); + assertNotNull("Job " + id2 + " missing at the jobtracker before expiry", + jobtracker.getJob(id2)); + + testJobConfFile(id, false); + testJobConfFile(id2, true); + } finally { + if (mr != null) {mr.shutdown();} + } + } + + /** Test if the job after gets expired but basic info is cached with jobtracker + */ + public void testRetiredJobCache() throws Exception { + MiniMRCluster mr = null; + int min = 10000; + int max = 20000; + try { + FakeClock clock = new FakeClock(); + JobConf conf = new JobConf(); + + conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec + conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs + conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs + conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false); + conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 1); + conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 1); + mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, + clock); + JobConf jobConf = mr.createJobConf(); + JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker(); + + Path inDir = new Path(testDir, "input3.1"); + Path outDir = new Path(testDir, "output3.1"); + RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0); + rj.waitForCompletion(); + JobID id = rj.getID(); + JobClient jc = new JobClient(jobConf); + + // check if the job is successful + assertTrue(rj.isSuccessful()); + JobStatus status1 = jobtracker.getJobStatus(id); + + clock.advance(min + 1); // adv to expiry min time + + inDir = new Path(testDir, "input3.2"); + outDir = new Path(testDir, "output3.2"); + RunningJob rj2 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0); + rj2.waitForCompletion(); + JobID id2 = rj2.getID(); + JobStatus status2 = jobtracker.getJobStatus(id2); + + // check if the job#1 is missing in jt but cached status + assertNotNull("Job status missing from status cache", jc.getJob(id)); + // check the status at jobtracker + assertEquals("Status mismatch for job " + id, status1.toString(), + jobtracker.getJobStatus(id).toString()); + testRetiredCachedJobStatus(status1, rj); + assertNull("Job still not missing from jobtracker", jobtracker.getJob(id)); + + // check if the job#2 exists + assertNotNull(jc.getJob(id2)); + // check the status .. + + assertNotNull("Job " + id2 + " missing at the jobtracker before expiry", + jobtracker.getJob(id2)); + + testJobConfFile(id, false); + testJobConfFile(id2, true); + + clock.advance(min + 1); // adv to expiry min time + + inDir = new Path(testDir, "input3.3"); + outDir = new Path(testDir, "output3.3"); + RunningJob rj3 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0); + rj3.waitForCompletion(); + JobID id3 = rj3.getID(); + + // check if the job#1 is missing in all the caches + assertNull("Job status still in status cache", jc.getJob(id)); + // check if the job#2 is missing in jt but cached status + assertNotNull(jc.getJob(id2)); + assertEquals("Status mismatch for job " + id2, status2.toString(), + jobtracker.getJobStatus(id2).toString()); + testRetiredCachedJobStatus(status2, rj2); + assertNull("Job " + id2 + " missing at the jobtracker before expiry", + jobtracker.getJob(id2)); + // check if the job#3 exists + assertNotNull(jc.getJob(id3)); + assertNotNull("Job " + id3 + " missing at the jobtracker before expiry", + jobtracker.getJob(id3)); + } finally { + if (mr != null) {mr.shutdown();} + } + } + + private static void testRetiredCachedJobStatus(JobStatus status, + RunningJob rj) + throws IOException { + assertEquals(status.getJobID(), rj.getID()); + assertEquals(status.mapProgress(), rj.mapProgress()); + assertEquals(status.reduceProgress(), rj.reduceProgress()); + assertEquals(status.setupProgress(), rj.setupProgress()); + assertEquals(status.cleanupProgress(), rj.cleanupProgress()); + assertEquals(status.getRunState(), rj.getJobState()); + assertEquals(status.getJobName(), rj.getJobName()); + assertEquals(status.getTrackingUrl(), rj.getTrackingURL()); + assertEquals(status.isRetired(), true); + } } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Tue Aug 11 17:43:14 2009 @@ -46,7 +46,9 @@ FakeTaskTrackerManager taskTrackerManager) throws IOException { super(new JobID("test", ++jobCounter), jobConf, null); this.startTime = System.currentTimeMillis(); - this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP); + this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, + jobConf.getUser(), + jobConf.getJobName(), "", ""); this.status.setJobPriority(JobPriority.NORMAL); this.status.setStartTime(startTime); } Modified: hadoop/mapreduce/trunk/src/webapps/job/analysejobhistory.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/analysejobhistory.jsp?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/webapps/job/analysejobhistory.jsp (original) +++ hadoop/mapreduce/trunk/src/webapps/job/analysejobhistory.jsp Tue Aug 11 17:43:14 2009 @@ -23,14 +23,12 @@ import="java.io.*" import="java.util.*" import="org.apache.hadoop.mapred.*" + import="org.apache.hadoop.fs.*" import="org.apache.hadoop.util.*" import="java.text.SimpleDateFormat" import="org.apache.hadoop.mapred.JobHistory.*" %> - - "/> - "/> - + <%! private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; %> @@ -46,7 +44,8 @@ if (numTasks != null) { showTasks = Integer.parseInt(numTasks); } - JobInfo job = (JobInfo)request.getSession().getAttribute("job"); + FileSystem fs = (FileSystem) application.getAttribute("fileSys"); + JobInfo job = JSPUtil.getJobInfo(request, fs); %>

Hadoop Job <%=jobid %>

User : <%=job.get(Keys.USER) %>
Modified: hadoop/mapreduce/trunk/src/webapps/job/jobconf_history.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobconf_history.jsp?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/webapps/job/jobconf_history.jsp (original) +++ hadoop/mapreduce/trunk/src/webapps/job/jobconf_history.jsp Tue Aug 11 17:43:14 2009 @@ -51,7 +51,7 @@ Path logDir = new Path(request.getParameter("jobLogDir")); Path jobFilePath = new Path(logDir, request.getParameter("jobUniqueString") + "_conf.xml"); - FileSystem fs = (FileSystem)request.getSession().getAttribute("fs"); + FileSystem fs = (FileSystem) application.getAttribute("fileSys"); FSDataInputStream jobFile = null; try { jobFile = fs.open(jobFilePath); Modified: hadoop/mapreduce/trunk/src/webapps/job/jobdetailshistory.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobdetailshistory.jsp?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/webapps/job/jobdetailshistory.jsp (original) +++ hadoop/mapreduce/trunk/src/webapps/job/jobdetailshistory.jsp Tue Aug 11 17:43:14 2009 @@ -30,10 +30,7 @@ %> <%! private static final long serialVersionUID = 1L; %> - - "/> - "/> - + <%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %> <% String jobid = request.getParameter("jobid"); @@ -44,8 +41,8 @@ String[] jobDetails = jobFile.getName().split("_"); String jobUniqueString = jobDetails[0] + "_" +jobDetails[1] + "_" + jobid ; - JobInfo job = (JobInfo)request.getSession().getAttribute("job"); - FileSystem fs = (FileSystem)request.getSession().getAttribute("fs"); + FileSystem fs = (FileSystem) application.getAttribute("fileSys"); + JobInfo job = JSPUtil.getJobInfo(request, fs); %>

Hadoop Job <%=jobid %> on History Viewer

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtaskshistory.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtaskshistory.jsp?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/webapps/job/jobtaskshistory.jsp (original) +++ hadoop/mapreduce/trunk/src/webapps/job/jobtaskshistory.jsp Tue Aug 11 17:43:14 2009 @@ -23,14 +23,12 @@ import="java.io.*" import="java.util.*" import="org.apache.hadoop.mapred.*" + import="org.apache.hadoop.fs.*" import="org.apache.hadoop.util.*" import="java.text.SimpleDateFormat" import="org.apache.hadoop.mapred.JobHistory.*" %> - - "/> - "/> - + <%! private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; @@ -45,8 +43,8 @@ String taskStatus = request.getParameter("status"); String taskType = request.getParameter("taskType"); - JobHistory.JobInfo job = (JobHistory.JobInfo)request. - getSession().getAttribute("job"); + FileSystem fs = (FileSystem) application.getAttribute("fileSys"); + JobInfo job = JSPUtil.getJobInfo(request, fs); Map tasks = job.getAllTasks(); %> Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original) +++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Tue Aug 11 17:43:14 2009 @@ -100,8 +100,7 @@ @@ -160,13 +159,27 @@ <%=JSPUtil.generateJobTable("Running", runningJobs, 30, 0)%>
-

Completed Jobs

-<%=JSPUtil.generateJobTable("Completed", completedJobs, 0, runningJobs.size())%> -
+<% +if (completedJobs.size() > 0) { + out.print("

Completed Jobs

"); + out.print(JSPUtil.generateJobTable("Completed", completedJobs, 0, + runningJobs.size())); + out.print("
"); +} +%> + +<% +if (failedJobs.size() > 0) { + out.print("

Failed Jobs

"); + out.print(JSPUtil.generateJobTable("Failed", failedJobs, 0, + (runningJobs.size()+completedJobs.size()))); + out.print("
"); +} +%> -

Failed Jobs

-<%=JSPUtil.generateJobTable("Failed", failedJobs, 0, - (runningJobs.size()+completedJobs.size()))%> +

Retired Jobs

+<%=JSPUtil.generateRetiredJobTable(tracker, + (runningJobs.size()+completedJobs.size()+failedJobs.size()))%>

Local Logs

Modified: hadoop/mapreduce/trunk/src/webapps/job/taskdetailshistory.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/taskdetailshistory.jsp?rev=803231&r1=803230&r2=803231&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/webapps/job/taskdetailshistory.jsp (original) +++ hadoop/mapreduce/trunk/src/webapps/job/taskdetailshistory.jsp Tue Aug 11 17:43:14 2009 @@ -23,14 +23,12 @@ import="java.io.*" import="java.util.*" import="org.apache.hadoop.mapred.*" + import="org.apache.hadoop.fs.*" import="org.apache.hadoop.util.*" import="java.text.SimpleDateFormat" import="org.apache.hadoop.mapred.JobHistory.*" %> - - "/> - "/> - + <%! private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; %> <%! private static final long serialVersionUID = 1L; %> @@ -40,8 +38,8 @@ String logFile = request.getParameter("logFile"); String encodedLogFileName = JobHistory.JobInfo.encodeJobHistoryFilePath(logFile); String taskid = request.getParameter("taskid"); - JobHistory.JobInfo job = (JobHistory.JobInfo) - request.getSession().getAttribute("job"); + FileSystem fs = (FileSystem) application.getAttribute("fileSys"); + JobInfo job = JSPUtil.getJobInfo(request, fs); JobHistory.Task task = job.getAllTasks().get(taskid); String type = task.get(Keys.TASK_TYPE); %>