Author: ddas
Date: Sun Mar 8 02:07:32 2009
New Revision: 751353
URL: http://svn.apache.org/viewvc?rev=751353&view=rev
Log:
Merge -r 746901:746902 and 746902:746903 from trunk onto 0.19 branch. Fixes HADOOP-5285.
Added:
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
- copied unchanged from r746903, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Modified:
hadoop/core/branches/branch-0.19/ (props changed)
hadoop/core/branches/branch-0.19/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Mar 8 02:07:32 2009
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,736426,738697,740077,741703,741762,743745,743892,745180
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,736426,738697,740077,741703,741762,743745,743892,745180,746902-746903
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Sun Mar 8 02:07:32 2009
@@ -42,6 +42,13 @@
to KILLED_UNCLEAN only for relevant type of tasks.
(Amareshwari Sriramadasu via yhemanth)
+ HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is
+ inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class
+ outside the TaskTracker and makes it a generic class that is used by the
+ JobTracker also for deleting the paths on the job's output fs. (3) Moves the
+ references to completedJobStore outside the block where the JobTracker is locked.
+ (ddas)
+
Release 0.19.1 - 2009-02-23
INCOMPATIBLE CHANGES
Propchange: hadoop/core/branches/branch-0.19/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Mar 8 02:07:32 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
-/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082,736426,738697,740077,741703,741762,743296,743745,743892,745180
+/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082,736426,738697,740077,741703,741762,743296,743745,743892,745180,746902-746903
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Sun Mar 8 02:07:32 2009
@@ -931,35 +931,39 @@
/*
* Return task cleanup attempt if any, to run on a given tracker
*/
- public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts,
+ public Task obtainTaskCleanupTask(TaskTrackerStatus tts,
boolean isMapSlot)
throws IOException {
- if (this.status.getRunState() != JobStatus.RUNNING ||
- jobFailed || jobKilled) {
- return null;
- }
-
- String taskTracker = tts.getTrackerName();
- if (!shouldRunOnTaskTracker(taskTracker)) {
+ if (!tasksInited.get()) {
return null;
}
- TaskAttemptID taskid = null;
- TaskInProgress tip = null;
- if (isMapSlot) {
- if (!mapCleanupTasks.isEmpty()) {
- taskid = mapCleanupTasks.remove(0);
- tip = maps[taskid.getTaskID().getId()];
+ synchronized (this) {
+ if (this.status.getRunState() != JobStatus.RUNNING ||
+ jobFailed || jobKilled) {
+ return null;
}
- } else {
- if (!reduceCleanupTasks.isEmpty()) {
- taskid = reduceCleanupTasks.remove(0);
- tip = reduces[taskid.getTaskID().getId()];
+ String taskTracker = tts.getTrackerName();
+ if (!shouldRunOnTaskTracker(taskTracker)) {
+ return null;
}
+ TaskAttemptID taskid = null;
+ TaskInProgress tip = null;
+ if (isMapSlot) {
+ if (!mapCleanupTasks.isEmpty()) {
+ taskid = mapCleanupTasks.remove(0);
+ tip = maps[taskid.getTaskID().getId()];
+ }
+ } else {
+ if (!reduceCleanupTasks.isEmpty()) {
+ taskid = reduceCleanupTasks.remove(0);
+ tip = reduces[taskid.getTaskID().getId()];
+ }
+ }
+ if (tip != null) {
+ return tip.addRunningTask(taskid, taskTracker, true);
+ }
+ return null;
}
- if (tip != null) {
- return tip.addRunningTask(taskid, taskTracker, true);
- }
- return null;
}
/**
@@ -1018,9 +1022,6 @@
* @return true/false
*/
private synchronized boolean canLaunchJobCleanupTask() {
- if (!tasksInited.get()) {
- return false;
- }
// check if the job is running
if (status.getRunState() != JobStatus.RUNNING &&
status.getRunState() != JobStatus.PREP) {
@@ -2319,6 +2320,7 @@
*/
synchronized void garbageCollect() {
// Let the JobTracker know that a job is complete
+ jobtracker.storeCompletedJob(this);
jobtracker.finalizeJob(this);
try {
@@ -2342,8 +2344,7 @@
// Delete temp dfs dirs created if any, like in case of
// speculative exn of reduces.
Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
- FileSystem fs = tempDir.getFileSystem(conf);
- fs.delete(tempDir, true);
+ new CleanupQueue().addToQueue(conf,tempDir);
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
@@ -2354,6 +2355,7 @@
this.runningMapCache = null;
this.nonRunningReduces = null;
this.runningReduces = null;
+
}
/**
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Sun
Mar 8 02:07:32 2009
@@ -1567,9 +1567,6 @@
synchronized void finalizeJob(JobInProgress job) {
// Mark the 'non-running' tasks for pruning
markCompletedJob(job);
-
- //persists the job info in DFS
- completedJobStatusStore.store(job);
JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
@@ -2444,34 +2441,41 @@
setJobPriority(jobid, newPriority);
}
- public synchronized JobProfile getJobProfile(JobID jobid) {
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getProfile();
- } else {
- return completedJobStatusStore.readJobProfile(jobid);
+ void storeCompletedJob(JobInProgress job) {
+ //persists the job info in DFS
+ completedJobStatusStore.store(job);
+ }
+
+ public JobProfile getJobProfile(JobID jobid) {
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getProfile();
+ }
}
+ return completedJobStatusStore.readJobProfile(jobid);
}
- public synchronized JobStatus getJobStatus(JobID jobid) {
+ public JobStatus getJobStatus(JobID jobid) {
if (null == jobid) {
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
return null;
}
-
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getStatus();
- } else {
- return completedJobStatusStore.readJobStatus(jobid);
- }
- }
- public synchronized Counters getJobCounters(JobID jobid) {
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getCounters();
- } else {
- return completedJobStatusStore.readCounters(jobid);
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getStatus();
+ }
+ }
+ return completedJobStatusStore.readJobStatus(jobid);
+ }
+ public Counters getJobCounters(JobID jobid) {
+ synchronized (this) {
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ return job.getCounters();
+ }
}
+ return completedJobStatusStore.readCounters(jobid);
}
public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
@@ -2569,18 +2573,17 @@
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
JobID jobid, int fromEventId, int maxEvents) throws IOException{
- TaskCompletionEvent[] events = EMPTY_EVENTS;
-
- JobInProgress job = this.jobs.get(jobid);
- if (null != job) {
- if (job.inited()) {
- events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+ synchronized (this) {
+ JobInProgress job = this.jobs.get(jobid);
+ if (null != job) {
+ if (job.inited()) {
+ return job.getTaskCompletionEvents(fromEventId, maxEvents);
+ } else {
+ return EMPTY_EVENTS;
+ }
}
}
- else {
- events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
- }
- return events;
+ return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
}
/**
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sun Mar 8 02:07:32 2009
@@ -915,9 +915,7 @@
private void startCleanupThreads() throws IOException {
taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
- directoryCleanupThread = new CleanupQueue(originalConf);
- directoryCleanupThread.setDaemon(true);
- directoryCleanupThread.start();
+ directoryCleanupThread = new CleanupQueue();
}
/**
@@ -1362,7 +1360,7 @@
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles){
- directoryCleanupThread.addToQueue(getLocalFiles(fConf,
+ directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf,
getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
@@ -2409,17 +2407,20 @@
//might be using the dir. The JVM running the tasks would clean
//the workdir per a task in the task process itself.
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
taskDir));
}
else {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
taskDir+"/job.xml"));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+ directoryCleanupThread.addToQueue(defaultJobConf,
+ getLocalFiles(defaultJobConf,
taskDir+"/work"));
}
}
@@ -2966,43 +2967,6 @@
return paths;
}
- // cleanup queue which deletes files/directories of the paths queued up.
- private static class CleanupQueue extends Thread {
- private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
- private JobConf conf;
-
- public CleanupQueue(JobConf conf) throws IOException{
- setName("Directory/File cleanup thread");
- setDaemon(true);
- this.conf = conf;
- }
-
- public void addToQueue(Path... paths) {
- for (Path p : paths) {
- try {
- queue.put(p);
- } catch (InterruptedException ie) {}
- }
- return;
- }
-
- public void run() {
- LOG.debug("cleanup thread started");
- Path path = null;
- while (true) {
- try {
- path = queue.take();
- // delete the path.
- FileSystem fs = path.getFileSystem(conf);
- fs.delete(path, true);
- } catch (IOException e) {
- LOG.info("Error deleting path" + path);
- } catch (InterruptedException t) {
- }
- }
- }
- }
-
int getMaxCurrentMapTasks() {
return maxCurrentMapTasks;
}
|