hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r746909 - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/
Date Mon, 23 Feb 2009 07:36:57 GMT
Author: ddas
Date: Mon Feb 23 07:36:56 2009
New Revision: 746909

URL: http://svn.apache.org/viewvc?rev=746909&view=rev
Log:
Merge -r 746901:746902 and 746902:746903 from trunk onto 0.20 branch. Fixes HADOOP-5285.

Added:
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
      - copied unchanged from r746903, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 07:36:56 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Mon Feb 23 07:36:56 2009
@@ -638,6 +638,13 @@
     
     HADOOP-5292. Fix NPE in KFS::getBlockLocations. (Sriram Rao via lohit)    
 
+    HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is
+    inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class
+    outside the TaskTracker and makes it a generic class that is used by the 
+    JobTracker also for deleting the paths on the job's output fs. (3) Moves the
+    references to completedJobStore outside the block where the JobTracker is locked.
+    (ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 23 07:36:56 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Mon Feb 23 07:36:56 2009
@@ -978,35 +978,39 @@
   /*
    * Return task cleanup attempt if any, to run on a given tracker
    */
-  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
                                                  boolean isMapSlot)
   throws IOException {
-    if (this.status.getRunState() != JobStatus.RUNNING || 
-        jobFailed || jobKilled) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!tasksInited.get()) {
       return null;
     }
-    TaskAttemptID taskid = null;
-    TaskInProgress tip = null;
-    if (isMapSlot) {
-      if (!mapCleanupTasks.isEmpty()) {
-        taskid = mapCleanupTasks.remove(0);
-        tip = maps[taskid.getTaskID().getId()];
+    synchronized (this) {
+      if (this.status.getRunState() != JobStatus.RUNNING || 
+          jobFailed || jobKilled) {
+        return null;
       }
-    } else {
-      if (!reduceCleanupTasks.isEmpty()) {
-        taskid = reduceCleanupTasks.remove(0);
-        tip = reduces[taskid.getTaskID().getId()];
+      String taskTracker = tts.getTrackerName();
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
       }
+      TaskAttemptID taskid = null;
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        if (!mapCleanupTasks.isEmpty()) {
+          taskid = mapCleanupTasks.remove(0);
+          tip = maps[taskid.getTaskID().getId()];
+        }
+      } else {
+        if (!reduceCleanupTasks.isEmpty()) {
+          taskid = reduceCleanupTasks.remove(0);
+          tip = reduces[taskid.getTaskID().getId()];
+        }
+      }
+      if (tip != null) {
+        return tip.addRunningTask(taskid, taskTracker, true);
+      }
+      return null;
     }
-    if (tip != null) {
-      return tip.addRunningTask(taskid, taskTracker, true);
-    }
-    return null;
   }
   
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
@@ -1111,9 +1115,6 @@
    * @return true/false
    */
   private synchronized boolean canLaunchJobCleanupTask() {
-    if (!tasksInited.get()) {
-      return false;
-    }
     // check if the job is running
     if (status.getRunState() != JobStatus.RUNNING &&
         status.getRunState() != JobStatus.PREP) {
@@ -2444,6 +2445,7 @@
    */
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
+    jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
       
     try {
@@ -2467,8 +2469,7 @@
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
-      FileSystem fs = tempDir.getFileSystem(conf);
-      fs.delete(tempDir, true); 
+      new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -2479,6 +2480,7 @@
     this.runningMapCache = null;
     this.nonRunningReduces = null;
     this.runningReduces = null;
+
   }
 
   /**

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon
Feb 23 07:36:56 2009
@@ -1823,9 +1823,6 @@
   synchronized void finalizeJob(JobInProgress job) {
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
-
-    //persists the job info in DFS
-    completedJobStatusStore.store(job);
     
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
@@ -2856,34 +2853,41 @@
     setJobPriority(jobid, newPriority);
   }
                            
-  public synchronized JobProfile getJobProfile(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getProfile();
-    } else {
-      return completedJobStatusStore.readJobProfile(jobid);
+  void storeCompletedJob(JobInProgress job) {
+    //persists the job info in DFS
+    completedJobStatusStore.store(job);
+  }
+
+  public JobProfile getJobProfile(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getProfile();
+      } 
     }
+    return completedJobStatusStore.readJobProfile(jobid);
   }
-  public synchronized JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
     }
-    
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getStatus();
-    } else {
-      return completedJobStatusStore.readJobStatus(jobid);
-    }
-  }
-  public synchronized Counters getJobCounters(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getCounters();
-    } else {
-      return completedJobStatusStore.readCounters(jobid);
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getStatus();
+      } 
+    }
+    return completedJobStatusStore.readJobStatus(jobid);
+  }
+  public Counters getJobCounters(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getCounters();
+      } 
     }
+    return completedJobStatusStore.readCounters(jobid);
   }
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
@@ -2981,18 +2985,17 @@
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events = EMPTY_EVENTS;
-
-    JobInProgress job = this.jobs.get(jobid);
-    if (null != job) {
-      if (job.inited()) {
-        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+    synchronized (this) {
+      JobInProgress job = this.jobs.get(jobid);
+      if (null != job) {
+        if (job.inited()) {
+          return job.getTaskCompletionEvents(fromEventId, maxEvents);
+        } else {
+          return EMPTY_EVENTS;
+        }
       }
     }
-    else {
-      events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
-    }
-    return events;
+    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
   }
 
   /**

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=746909&r1=746908&r2=746909&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Mon Feb 23 07:36:56 2009
@@ -972,9 +972,7 @@
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue(originalConf);
-    directoryCleanupThread.setDaemon(true);
-    directoryCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue();
   }
   
   /**
@@ -1422,7 +1420,7 @@
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
@@ -2487,17 +2485,20 @@
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir));
             }  
             
             else {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
             }  
           }
@@ -3026,43 +3027,6 @@
     return paths;
   }
 
-  // cleanup queue which deletes files/directories of the paths queued up.
-  private static class CleanupQueue extends Thread {
-    private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
-    private JobConf conf;
-    
-    public CleanupQueue(JobConf conf) throws IOException{
-      setName("Directory/File cleanup thread");
-      setDaemon(true);
-      this.conf = conf;
-    }
-
-    public void addToQueue(Path... paths) {
-      for (Path p : paths) {
-        try {
-          queue.put(p);
-        } catch (InterruptedException ie) {}
-      }
-      return;
-    }
-
-    public void run() {
-      LOG.debug("cleanup thread started");
-      Path path = null;
-      while (true) {
-        try {
-          path = queue.take();
-          // delete the path.
-          FileSystem fs = path.getFileSystem(conf);
-          fs.delete(path, true);
-        } catch (IOException e) {
-          LOG.info("Error deleting path" + path);
-        } catch (InterruptedException t) {
-        }
-      }
-    }
-  }
-
   int getMaxCurrentMapTasks() {
     return maxCurrentMapTasks;
   }



Mime
View raw message