Author: omalley
Date: Fri Mar 4 03:26:56 2011
New Revision: 1076970
URL: http://svn.apache.org/viewvc?rev=1076970&view=rev
Log:
commit 5a5b9ce1a4fa99c91e497d8ee6cd50f24b326393
Author: Yahoo\! <ltucker@yahoo-inc.com>
Date: Thu Aug 13 09:35:35 2009 -0700
Applying patch 2935902.mr817.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/analysejobhistory.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobconf_history.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetailshistory.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtaskshistory.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetailshistory.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri
Mar 4 03:26:56 2011
@@ -323,6 +323,21 @@
</property>
<property>
+ <name>mapred.job.tracker.retiredjobs.cache.size</name>
+ <value>1000</value>
+ <description>The number of retired job status to keep in the cache.
+ </description>
+</property>
+
+<property>
+ <name>mapred.job.tracker.jobhistory.lru.cache.size</name>
+ <value>5</value>
+ <description>The number of job history files loaded in memory. The jobs are
+ loaded when they are first accessed. The cache is cleared based on LRU.
+ </description>
+</property>
+
+<property>
<name>mapred.jobtracker.instrumentation</name>
<value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
<description>Expert: The instrumentation class to associate with each JobTracker.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
Fri Mar 4 03:26:56 2011
@@ -18,21 +18,39 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.util.Iterator;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
import org.apache.hadoop.util.ServletUtil;
+import org.apache.hadoop.util.StringUtils;
class JSPUtil {
private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
public static final Configuration conf = new Configuration();
+ //LRU based cache
+ private static final Map<String, JobInfo> jobHistoryCache =
+ new LinkedHashMap<String, JobInfo>();
+
+ private static final int CACHE_SIZE =
+ conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5);
+
+ private static final Log LOG = LogFactory.getLog(JSPUtil.class);
/**
* Method used to process the request from the job page based on the
* request which it has received. For example like changing priority.
@@ -181,4 +199,104 @@ class JSPUtil {
return sb.toString();
}
+ @SuppressWarnings("unchecked")
+ public static String generateRetiredJobTable(JobTracker tracker, int rowId)
+ throws IOException {
+
+ StringBuffer sb = new StringBuffer();
+ sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
+
+ Iterator<RetireJobInfo> iterator =
+ tracker.retireJobs.getAll().descendingIterator();
+ if (!iterator.hasNext()) {
+ sb.append("<tr><td align=\"center\" colspan=\"8\"><i>none</i>"
+
+ "</td></tr>\n");
+ } else {
+ sb.append("<tr>");
+
+ sb.append("<td><b>Jobid</b></td>");
+ sb.append("<td><b>Priority</b></td>");
+ sb.append("<td><b>User</b></td>");
+ sb.append("<td><b>Name</b></td>");
+ sb.append("<td><b>State</b></td>");
+ sb.append("<td><b>Start Time</b></td>");
+ sb.append("<td><b>Finish Time</b></td>");
+ sb.append("<td><b>Map % Complete</b></td>");
+ sb.append("<td><b>Reduce % Complete</b></td>");
+ sb.append("<td><b>Job Scheduling Information</b></td>");
+ sb.append("</tr>\n");
+ for (int i = 0; i < 100 && iterator.hasNext(); i++) {
+ RetireJobInfo info = iterator.next();
+ String historyFile = info.getHistoryFile();
+ String historyFileUrl = null;
+ if (historyFile != null && !historyFile.equals("")) {
+ try {
+ historyFileUrl = URLEncoder.encode(info.getHistoryFile(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.warn("Can't create history url ", e);
+ }
+ }
+ sb.append("<tr>");
+ sb.append(
+ "<td id=\"job_" + rowId + "\">" +
+
+ (historyFileUrl == null ? "" :
+ "<a href=\"jobdetailshistory.jsp?jobid=" +
+ info.status.getJobId() + "&logFile=" + historyFileUrl + "\">") +
+
+ info.status.getJobId() + "</a></td>" +
+
+ "<td id=\"priority_" + rowId + "\">" +
+ info.status.getJobPriority().toString() + "</td>" +
+ "<td id=\"user_" + rowId + "\">" + info.profile.getUser()
+ + "</td>" +
+ "<td id=\"name_" + rowId + "\">" + info.profile.getJobName()
+ + "</td>" +
+ "<td>" + JobStatus.getJobRunState(info.status.getRunState())
+ + "</td>" +
+ "<td>" + new Date(info.status.getStartTime()) + "</td>" +
+ "<td>" + new Date(info.finishTime) + "</td>" +
+
+ "<td>" + StringUtils.formatPercent(info.status.mapProgress(), 2)
+ + ServletUtil.percentageGraph(info.status.mapProgress() * 100, 80) +
+ "</td>" +
+
+ "<td>" + StringUtils.formatPercent(info.status.reduceProgress(), 2)
+ + ServletUtil.percentageGraph(
+ info.status.reduceProgress() * 100, 80) +
+ "</td>" +
+
+ "<td>" + info.status.getSchedulingInfo() + "</td>" +
+
+ "</tr>\n");
+ rowId++;
+ }
+ }
+ sb.append("</table>\n");
+ return sb.toString();
+ }
+
+ static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs)
+ throws IOException {
+ String jobid = request.getParameter("jobid");
+ String logFile = request.getParameter("logFile");
+ synchronized(jobHistoryCache) {
+ JobInfo jobInfo = jobHistoryCache.remove(jobid);
+ if (jobInfo == null) {
+ jobInfo = new JobHistory.JobInfo(jobid);
+ LOG.info("Loading Job History file "+jobid + ". Cache size is " +
+ jobHistoryCache.size());
+ DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ;
+ }
+ jobHistoryCache.put(jobid, jobInfo);
+ if (jobHistoryCache.size() > CACHE_SIZE) {
+ Iterator<Map.Entry<String, JobInfo>> it =
+ jobHistoryCache.entrySet().iterator();
+ String removeJobId = it.next().getKey();
+ it.remove();
+ LOG.info("Job History file removed form cache "+removeJobId);
+ }
+ return jobInfo;
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
Fri Mar 4 03:26:56 2011
@@ -1341,7 +1341,10 @@ public class JobClient extends Configure
}
}
LOG.info("Job complete: " + jobId);
- job.getCounters().log(LOG);
+ Counters counters = job.getCounters();
+ if (counters != null) {
+ counters.log(LOG);
+ }
return job.isSuccessful();
}
@@ -1682,7 +1685,12 @@ public class JobClient extends Configure
} else {
System.out.println();
System.out.println(job);
- System.out.println(job.getCounters());
+ Counters counters = job.getCounters();
+ if (counters != null) {
+ System.out.println(counters);
+ } else {
+ System.out.println("Counters not available. Job is retired.");
+ }
exitCode = 0;
}
} else if (getCounter) {
@@ -1691,10 +1699,16 @@ public class JobClient extends Configure
System.out.println("Could not find job " + jobid);
} else {
Counters counters = job.getCounters();
- Group group = counters.getGroup(counterGroupName);
- Counter counter = group.getCounterForName(counterName);
- System.out.println(counter.getCounter());
- exitCode = 0;
+ if (counters == null) {
+ System.out.println("Counters not available for retired job " +
+ jobid);
+ exitCode = -1;
+ } else {
+ Group group = counters.getGroup(counterGroupName);
+ Counter counter = group.getCounterForName(counterName);
+ System.out.println(counter.getCounter());
+ exitCode = 0;
+ }
}
} else if (killJob) {
RunningJob job = getJob(JobID.forName(jobid));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
Fri Mar 4 03:26:56 2011
@@ -135,15 +135,19 @@ public class JobHistory {
private ThreadPoolExecutor executor = null;
private final Configuration conf;
+ private final JobTracker jobTracker;
// cache from job-key to files associated with it.
private Map<JobID, FilesHolder> fileCache =
new ConcurrentHashMap<JobID, FilesHolder>();
- JobHistoryFilesManager(Configuration conf) throws IOException {
+ JobHistoryFilesManager(Configuration conf, JobTracker jobTracker)
+ throws IOException {
this.conf = conf;
+ this.jobTracker = jobTracker;
}
+
void start() {
executor = new ThreadPoolExecutor(1, 3, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
@@ -192,7 +196,22 @@ public class JobHistory {
fileCache.remove(id);
}
- void moveToDone(final JobID id, final List<Path> paths) {
+ void moveToDone(final JobID id) {
+ final List<Path> paths = new ArrayList<Path>();
+ final Path historyFile = fileManager.getHistoryFile(id);
+ if (historyFile == null) {
+ LOG.info("No file for job-history with " + id + " found in cache!");
+ } else {
+ paths.add(historyFile);
+ }
+
+ final Path confPath = fileManager.getConfFileWriters(id);
+ if (confPath == null) {
+ LOG.info("No file for jobconf with " + id + " found in cache!");
+ } else {
+ paths.add(confPath);
+ }
+
executor.execute(new Runnable() {
public void run() {
@@ -208,12 +227,18 @@ public class JobHistory {
new FsPermission(HISTORY_FILE_PERMISSION));
}
}
-
- //purge the job from the cache
- fileManager.purgeJob(id);
} catch (Throwable e) {
LOG.error("Unable to move history file to DONE folder.", e);
}
+ String historyFileDonePath = null;
+ if (historyFile != null) {
+ historyFileDonePath = new Path(DONE,
+ historyFile.getName()).toString();
+ }
+ jobTracker.historyFileCopied(id, historyFileDonePath);
+
+ //purge the job from the cache
+ fileManager.purgeJob(id);
}
});
@@ -261,8 +286,8 @@ public class JobHistory {
* @return true if intialized properly
* false otherwise
*/
- public static boolean init(JobConf conf, String hostname,
- long jobTrackerStartTime){
+ public static boolean init(JobTracker jobTracker, JobConf conf,
+ String hostname, long jobTrackerStartTime){
try {
LOG_DIR = conf.get("hadoop.job.history.location" ,
"file:///" + new File(
@@ -287,7 +312,7 @@ public class JobHistory {
jtConf = conf;
// initialize the file manager
- fileManager = new JobHistoryFilesManager(conf);
+ fileManager = new JobHistoryFilesManager(conf, jobTracker);
} catch(IOException e) {
LOG.error("Failed to initialize JobHistory log file", e);
disableHistory = true;
@@ -1043,27 +1068,7 @@ public class JobHistory {
* This *should* be the last call to jobhistory for a given job.
*/
static void markCompleted(JobID id) throws IOException {
- List<Path> paths = new ArrayList<Path>();
- Path path = fileManager.getHistoryFile(id);
- if (path == null) {
- LOG.info("No file for job-history with " + id + " found in cache!");
- return;
- } else {
- paths.add(path);
- }
-
- Path confPath = fileManager.getConfFileWriters(id);
- if (confPath == null) {
- LOG.info("No file for jobconf with " + id + " found in cache!");
- return;
- } else {
- paths.add(confPath);
- }
-
- //move the job files to done folder and purge the job
- if (paths.size() > 0) {
- fileManager.moveToDone(id, paths);
- }
+ fileManager.moveToDone(id);
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:26:56 2011
@@ -193,6 +193,8 @@ class JobInProgress {
private boolean hasSpeculativeReduces;
private long inputLength = 0;
private String user;
+ private String historyFile = "";
+ private boolean historyFileCopied;
// Per-job counters
public static enum Counter {
@@ -986,6 +988,22 @@ class JobInProgress {
}
}
+ String getHistoryFile() {
+ return historyFile;
+ }
+
+ synchronized void setHistoryFile(String file) {
+ this.historyFile = file;
+ }
+
+ boolean isHistoryFileCopied() {
+ return historyFileCopied;
+ }
+
+ synchronized void setHistoryFileCopied() {
+ this.historyFileCopied = true;
+ }
+
/**
* Returns the job-level counters.
*
@@ -2324,11 +2342,13 @@ class JobInProgress {
private synchronized void terminateJob(int jobTerminationState) {
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
+ this.finishTime = System.currentTimeMillis();
+ this.status.setMapProgress(1.0f);
+ this.status.setReduceProgress(1.0f);
+ this.status.setCleanupProgress(1.0f);
+
if (jobTerminationState == JobStatus.FAILED) {
- this.status = new JobStatus(status.getJobID(),
- 1.0f, 1.0f, 1.0f, JobStatus.FAILED,
- status.getJobPriority());
- this.finishTime = System.currentTimeMillis();
+ this.status.setRunState(JobStatus.FAILED);
// Log the job summary
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2338,10 +2358,7 @@ class JobInProgress {
this.finishedMapTasks,
this.finishedReduceTasks);
} else {
- this.status = new JobStatus(status.getJobID(),
- 1.0f, 1.0f, 1.0f, JobStatus.KILLED,
- status.getJobPriority());
- this.finishTime = System.currentTimeMillis();
+ this.status.setRunState(JobStatus.KILLED);
// Log the job summary
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:26:56 2011
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -40,6 +41,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -170,7 +172,7 @@ public class JobTracker implements MRCon
* The minimum time (in ms) that a job's information has to remain
* in the JobTracker's memory before it is retired.
*/
- static final int MIN_TIME_BEFORE_RETIRE = 60000;
+ static final int MIN_TIME_BEFORE_RETIRE = 0;
private int nextJobId = 1;
@@ -413,13 +415,88 @@ public class JobTracker implements MRCon
}
+ synchronized void historyFileCopied(JobID jobid, String historyFile) {
+ JobInProgress job = getJob(jobid);
+ if (job != null) { //found in main cache
+ job.setHistoryFileCopied();
+ if (historyFile != null) {
+ job.setHistoryFile(historyFile);
+ }
+ return;
+ }
+ RetireJobInfo jobInfo = retireJobs.get(jobid);
+ if (jobInfo != null) { //found in retired cache
+ if (historyFile != null) {
+ jobInfo.setHistoryFile(historyFile);
+ }
+ }
+ }
+
+ static class RetireJobInfo {
+ final JobStatus status;
+ final JobProfile profile;
+ final long finishTime;
+ private String historyFile;
+ RetireJobInfo(JobStatus status, JobProfile profile, long finishTime,
+ String historyFile) {
+ this.status = status;
+ this.profile = profile;
+ this.finishTime = finishTime;
+ this.historyFile = historyFile;
+ }
+ void setHistoryFile(String file) {
+ this.historyFile = file;
+ }
+ String getHistoryFile() {
+ return historyFile;
+ }
+ }
///////////////////////////////////////////////////////
// Used to remove old finished Jobs that have been around for too long
///////////////////////////////////////////////////////
class RetireJobs implements Runnable {
+ private final Map<JobID, RetireJobInfo> jobIDStatusMap =
+ new HashMap<JobID, RetireJobInfo>();
+ private final LinkedList<RetireJobInfo> jobRetireInfoQ =
+ new LinkedList<RetireJobInfo>();
public RetireJobs() {
}
+ synchronized void addToCache(JobInProgress job) {
+ RetireJobInfo info = new RetireJobInfo(job.getStatus(),
+ job.getProfile(), job.getFinishTime(), job.getHistoryFile());
+ jobRetireInfoQ.add(info);
+ jobIDStatusMap.put(info.status.getJobID(), info);
+ if (jobRetireInfoQ.size() > retiredJobsCacheSize) {
+ RetireJobInfo removed = jobRetireInfoQ.remove();
+ jobIDStatusMap.remove(removed.status.getJobID());
+ LOG.info("Retired job removed from cache " + removed.status.getJobID());
+ }
+ }
+
+ synchronized RetireJobInfo get(JobID jobId) {
+ return jobIDStatusMap.get(jobId);
+ }
+
+ @SuppressWarnings("unchecked")
+ synchronized LinkedList<RetireJobInfo> getAll() {
+ return (LinkedList<RetireJobInfo>) jobRetireInfoQ.clone();
+ }
+
+ synchronized LinkedList<JobStatus> getAllJobStatus() {
+ LinkedList<JobStatus> list = new LinkedList<JobStatus>();
+ for (RetireJobInfo info : jobRetireInfoQ) {
+ list.add(info.status);
+ }
+ return list;
+ }
+
+ private boolean minConditionToRetire(JobInProgress job, long now) {
+ return job.getStatus().getRunState() != JobStatus.RUNNING &&
+ job.getStatus().getRunState() != JobStatus.PREP &&
+ (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
+ job.isHistoryFileCopied();
+ }
/**
* The run method lives for the life of the JobTracker,
* and removes Jobs that are not still running, but which
@@ -435,14 +512,35 @@ public class JobTracker implements MRCon
synchronized (jobs) {
for(JobInProgress job: jobs.values()) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING &&
- job.getStatus().getRunState() != JobStatus.PREP &&
- (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
+ if (minConditionToRetire(job, now) &&
(job.getFinishTime() < retireBefore)) {
retiredJobs.add(job);
}
}
}
+ synchronized (userToJobsMap) {
+ for (Map.Entry<String, ArrayList<JobInProgress>> entry :
+ userToJobsMap.entrySet()) {
+ String user = entry.getKey();
+ ArrayList<JobInProgress> userJobs = entry.getValue();
+ Iterator<JobInProgress> it = userJobs.iterator();
+ while (it.hasNext() &&
+ userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+ JobInProgress jobUser = it.next();
+ if (retiredJobs.contains(jobUser)) {
+ it.remove();
+ } else if (minConditionToRetire(jobUser, now)) {
+ LOG.info("User limit exceeded. Marking job: " +
+ jobUser.getJobID() + " for retire.");
+ retiredJobs.add(jobUser);
+ it.remove();
+ }
+ }
+ if (userJobs.isEmpty()) {
+ userToJobsMap.remove(user);
+ }
+ }
+ }
if (!retiredJobs.isEmpty()) {
synchronized (JobTracker.this) {
synchronized (jobs) {
@@ -454,22 +552,13 @@ public class JobTracker implements MRCon
l.jobRemoved(job);
}
String jobUser = job.getProfile().getUser();
- synchronized (userToJobsMap) {
- ArrayList<JobInProgress> userJobs =
- userToJobsMap.get(jobUser);
- synchronized (userJobs) {
- userJobs.remove(job);
- }
- if (userJobs.isEmpty()) {
- userToJobsMap.remove(jobUser);
- }
- }
LOG.info("Retired job with id: '" +
job.getProfile().getJobID() + "' of user '" +
jobUser + "'");
// clean up job files from the local disk
JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
+ addToCache(job);
}
}
}
@@ -1715,6 +1804,7 @@ public class JobTracker implements MRCon
Thread expireTrackersThread = null;
RetireJobs retireJobs = new RetireJobs();
Thread retireJobsThread = null;
+ final int retiredJobsCacheSize;
ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
"expireLaunchingTasks");
@@ -1795,6 +1885,8 @@ public class JobTracker implements MRCon
conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 *
60 * 1000);
RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
+ retiredJobsCacheSize =
+ conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum",
100);
MAX_BLACKLISTS_PER_TRACKER =
conf.getInt("mapred.max.tracker.blacklists", 4);
@@ -1864,7 +1956,7 @@ public class JobTracker implements MRCon
tmpInfoPort == 0, conf);
infoServer.setAttribute("job.tracker", this);
// initialize history parameters.
- boolean historyInitialized = JobHistory.init(conf, this.localMachine,
+ boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
this.startTime);
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
@@ -2375,76 +2467,16 @@ public class JobTracker implements MRCon
}
}
}
-
- // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given
user
- // in memory; information about the purged jobs is available via
- // JobHistory.
- synchronized (jobs) {
- synchronized (taskScheduler) {
- synchronized (userToJobsMap) {
- String jobUser = job.getProfile().getUser();
- if (!userToJobsMap.containsKey(jobUser)) {
- userToJobsMap.put(jobUser,
- new ArrayList<JobInProgress>());
- }
- ArrayList<JobInProgress> userJobs =
- userToJobsMap.get(jobUser);
- synchronized (userJobs) {
- // Add the currently completed 'job'
- userJobs.add(job);
-
- // Check if we need to retire some jobs of this user
- while (userJobs.size() >
- MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
- JobInProgress rjob = userJobs.get(0);
-
- // Do not delete 'current'
- // finished job just yet.
- if (rjob == job) {
- break;
- }
- // do not retire jobs that finished in the very recent past.
- if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) {
- break;
- }
-
- // Cleanup all datastructures
- int rjobRunState =
- rjob.getStatus().getRunState();
- if (rjobRunState == JobStatus.SUCCEEDED ||
- rjobRunState == JobStatus.FAILED ||
- rjobRunState == JobStatus.KILLED) {
- // Ok, this call to removeTaskEntries
- // is dangerous is some very very obscure
- // cases; e.g. when rjob completed, hit
- // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
- // limit and yet some task (taskid)
- // wasn't complete!
- removeJobTasks(rjob);
-
- userJobs.remove(0);
- jobs.remove(rjob.getProfile().getJobID());
- for (JobInProgressListener listener : jobInProgressListeners) {
- listener.jobRemoved(rjob);
- }
-
- LOG.info("Retired job with id: '" +
- rjob.getProfile().getJobID() + "' of user: '" +
- jobUser + "'");
- } else {
- // Do not remove jobs that aren't complete.
- // Stop here, and let the next pass take
- // care of purging jobs.
- break;
- }
- }
- }
- if (userJobs.isEmpty()) {
- userToJobsMap.remove(jobUser);
- }
- }
+ String jobUser = job.getProfile().getUser();
+ //add to the user to jobs mapping
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs = userToJobsMap.get(jobUser);
+ if (userJobs == null) {
+ userJobs = new ArrayList<JobInProgress>();
+ userToJobsMap.put(jobUser, userJobs);
}
+ userJobs.add(job);
}
}
@@ -3646,7 +3678,12 @@ public class JobTracker implements MRCon
JobInProgress job = jobs.get(jobid);
if (job != null) {
return job.getProfile();
- }
+ } else {
+ RetireJobInfo info = retireJobs.get(jobid);
+ if (info != null) {
+ return info.profile;
+ }
+ }
}
return completedJobStatusStore.readJobProfile(jobid);
}
@@ -3659,7 +3696,13 @@ public class JobTracker implements MRCon
JobInProgress job = jobs.get(jobid);
if (job != null) {
return job.getStatus();
- }
+ } else {
+
+ RetireJobInfo info = retireJobs.get(jobid);
+ if (info != null) {
+ return info.status;
+ }
+ }
}
return completedJobStatusStore.readJobStatus(jobid);
}
@@ -3798,19 +3841,19 @@ public class JobTracker implements MRCon
*/
public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)
throws IOException {
-
+ List<String> taskDiagnosticInfo = null;
JobID jobId = taskId.getJobID();
TaskID tipId = taskId.getTaskID();
JobInProgress job = jobs.get(jobId);
- if (job == null) {
- throw new IllegalArgumentException("Job " + jobId + " not found.");
- }
- TaskInProgress tip = job.getTaskInProgress(tipId);
- if (tip == null) {
- throw new IllegalArgumentException("TIP " + tipId + " not found.");
+ if (job != null) {
+ TaskInProgress tip = job.getTaskInProgress(tipId);
+ if (tip != null) {
+ taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
+ }
+
}
- List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
- return ((taskDiagnosticInfo == null) ? null
+
+ return ((taskDiagnosticInfo == null) ? new String[0]
: taskDiagnosticInfo.toArray(new String[0]));
}
@@ -3879,7 +3922,10 @@ public class JobTracker implements MRCon
}
public JobStatus[] getAllJobs() {
- return getJobStatus(jobs.values(),false);
+ List<JobStatus> list = new ArrayList<JobStatus>();
+ list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
+ list.addAll(retireJobs.getAllJobStatus());
+ return list.toArray(new JobStatus[list.size()]);
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
Fri Mar 4 03:26:56 2011
@@ -797,7 +797,7 @@ public class TestJobHistory extends Test
JobConf conf = new JobConf();
// keep for less time
conf.setLong("mapred.jobtracker.retirejob.check", 1000);
- conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+ conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
//set the done folder location
String doneFolder = "history_done";
@@ -886,7 +886,7 @@ public class TestJobHistory extends Test
JobConf conf = new JobConf();
// keep for less time
conf.setLong("mapred.jobtracker.retirejob.check", 1000);
- conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+ conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
//set the done folder location
String doneFolder = TEST_ROOT_DIR + "history_done";
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
Fri Mar 4 03:26:56 2011
@@ -59,7 +59,7 @@ public class TestJobHistoryParsing exte
JobConf conf = new JobConf();
conf.set("hadoop.job.history.location", historyDir.toString());
FileSystem fs = FileSystem.getLocal(new JobConf());
- JobHistory.init(conf, "localhost", 1234);
+ JobHistory.init(null, conf, "localhost", 1234);
Path historyLog = new Path(historyDir, "testlog");
PrintWriter out = new PrintWriter(fs.create(historyLog));
historyWriter.add(out);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/analysejobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/analysejobhistory.jsp?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/analysejobhistory.jsp
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/analysejobhistory.jsp
Fri Mar 4 03:26:56 2011
@@ -4,14 +4,12 @@
import="java.io.*"
import="java.util.*"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.fs.*"
import="org.apache.hadoop.util.*"
import="java.text.SimpleDateFormat"
import="org.apache.hadoop.mapred.JobHistory.*"
%>
-<jsp:include page="loadhistory.jsp">
- <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
- <jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
-</jsp:include>
+
<%! private static SimpleDateFormat dateFormat
= new SimpleDateFormat("d/MM HH:mm:ss") ;
%>
@@ -25,7 +23,8 @@
if (numTasks != null) {
showTasks = Integer.parseInt(numTasks);
}
- JobInfo job = (JobInfo)request.getSession().getAttribute("job");
+ FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+ JobInfo job = JSPUtil.getJobInfo(request, fs);
%>
<h2>Hadoop Job <a href="jobdetailshistory.jsp?jobid=<%=jobid%>&&logFile=<%=encodedLogFileName%>"><%=jobid
%> </a></h2>
<b>User : </b> <%=job.get(Keys.USER) %><br/>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobconf_history.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobconf_history.jsp?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobconf_history.jsp
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobconf_history.jsp
Fri Mar 4 03:26:56 2011
@@ -30,7 +30,7 @@
Path logDir = new Path(request.getParameter("jobLogDir"));
Path jobFilePath = new Path(logDir,
request.getParameter("jobUniqueString") + "_conf.xml");
- FileSystem fs = (FileSystem)request.getSession().getAttribute("fs");
+ FileSystem fs = (FileSystem) application.getAttribute("fileSys");
FSDataInputStream jobFile = null;
try {
jobFile = fs.open(jobFilePath);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetailshistory.jsp?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetailshistory.jsp
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetailshistory.jsp
Fri Mar 4 03:26:56 2011
@@ -9,10 +9,7 @@
import="java.text.SimpleDateFormat"
import="org.apache.hadoop.mapred.JobHistory.*"
%>
-<jsp:include page="loadhistory.jsp">
- <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
- <jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
-</jsp:include>
+
<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ;
%>
<%
String jobid = request.getParameter("jobid");
@@ -23,8 +20,8 @@
String[] jobDetails = jobFile.getName().split("_");
String jobUniqueString = jobDetails[0] + "_" +jobDetails[1] + "_" + jobid ;
- JobInfo job = (JobInfo)request.getSession().getAttribute("job");
- FileSystem fs = (FileSystem)request.getSession().getAttribute("fs");
+ FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+ JobInfo job = JSPUtil.getJobInfo(request, fs);
%>
<html><body>
<h2>Hadoop Job <%=jobid %> on <a href="jobhistory.jsp">History Viewer</a></h2>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtaskshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtaskshistory.jsp?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtaskshistory.jsp
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtaskshistory.jsp
Fri Mar 4 03:26:56 2011
@@ -4,14 +4,12 @@
import="java.io.*"
import="java.util.*"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.fs.*"
import="org.apache.hadoop.util.*"
import="java.text.SimpleDateFormat"
import="org.apache.hadoop.mapred.JobHistory.*"
%>
-<jsp:include page="loadhistory.jsp">
- <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
- <jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
-</jsp:include>
+
<%!
private static SimpleDateFormat dateFormat =
new SimpleDateFormat("d/MM HH:mm:ss") ;
@@ -24,8 +22,8 @@
String taskStatus = request.getParameter("status");
String taskType = request.getParameter("taskType");
- JobHistory.JobInfo job = (JobHistory.JobInfo)request.
- getSession().getAttribute("job");
+ FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+ JobInfo job = JSPUtil.getJobInfo(request, fs);
Map<String, JobHistory.Task> tasks = job.getAllTasks();
%>
<html>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp Fri
Mar 4 03:26:56 2011
@@ -79,8 +79,7 @@
<ul id="quicklinks-list">
<li><a href="#scheduling_info">Scheduling Info</a></li>
<li><a href="#running_jobs">Running Jobs</a></li>
- <li><a href="#completed_jobs">Completed Jobs</a></li>
- <li><a href="#failed_jobs">Failed Jobs</a></li>
+ <li><a href="#retired_jobs">Retired Jobs</a></li>
<li><a href="#local_logs">Local Logs</a></li>
</ul>
</div>
@@ -135,13 +134,27 @@ for(JobQueueInfo queue: queues) {
<%=JSPUtil.generateJobTable("Running", runningJobs, 30, 0)%>
<hr>
-<h2 id="completed_jobs">Completed Jobs</h2>
-<%=JSPUtil.generateJobTable("Completed", completedJobs, 0, runningJobs.size())%>
-<hr>
+<%
+if (completedJobs.size() > 0) {
+ out.print("<h2 id=\"completed_jobs\">Completed Jobs</h2>");
+ out.print(JSPUtil.generateJobTable("Completed", completedJobs, 0,
+ runningJobs.size()));
+ out.print("<hr>");
+}
+%>
+
+<%
+if (failedJobs.size() > 0) {
+ out.print("<h2 id=\"failed_jobs\">Failed Jobs</h2>");
+ out.print(JSPUtil.generateJobTable("Failed", failedJobs, 0,
+ (runningJobs.size()+completedJobs.size())));
+ out.print("<hr>");
+}
+%>
-<h2 id="failed_jobs">Failed Jobs</h2>
-<%=JSPUtil.generateJobTable("Failed", failedJobs, 0,
- (runningJobs.size()+completedJobs.size()))%>
+<h2 id="retired_jobs">Retired Jobs</h2>
+<%=JSPUtil.generateRetiredJobTable(tracker,
+ (runningJobs.size()+completedJobs.size()+failedJobs.size()))%>
<hr>
<h2 id="local_logs">Local Logs</h2>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetailshistory.jsp?rev=1076970&r1=1076969&r2=1076970&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetailshistory.jsp
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetailshistory.jsp
Fri Mar 4 03:26:56 2011
@@ -4,14 +4,12 @@
import="java.io.*"
import="java.util.*"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.fs.*"
import="org.apache.hadoop.util.*"
import="java.text.SimpleDateFormat"
import="org.apache.hadoop.mapred.JobHistory.*"
%>
-<jsp:include page="loadhistory.jsp">
- <jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
- <jsp:param name="jobTrackerId" value="<%=request.getParameter("jobTrackerId") %>"/>
-</jsp:include>
+
<%! private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss")
; %>
<%
@@ -19,8 +17,8 @@
String logFile = request.getParameter("logFile");
String encodedLogFileName = JobHistory.JobInfo.encodeJobHistoryFilePath(logFile);
String taskid = request.getParameter("taskid");
- JobHistory.JobInfo job = (JobHistory.JobInfo)
- request.getSession().getAttribute("job");
+ FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+ JobInfo job = JSPUtil.getJobInfo(request, fs);
JobHistory.Task task = job.getAllTasks().get(taskid);
String type = task.get(Keys.TASK_TYPE);
%>
|