hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r751353 - in /hadoop/core/branches/branch-0.19: ./ src/mapred/org/apache/hadoop/mapred/
Date Sun, 08 Mar 2009 02:07:32 GMT
Author: ddas
Date: Sun Mar  8 02:07:32 2009
New Revision: 751353

URL: http://svn.apache.org/viewvc?rev=751353&view=rev
Log:
Merge -r 746901:746902 and 746902:746903 from trunk onto 0.19 branch. Fixes HADOOP-5285.

Added:
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
      - copied unchanged from r746903, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Modified:
    hadoop/core/branches/branch-0.19/   (props changed)
    hadoop/core/branches/branch-0.19/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Mar  8 02:07:32 2009
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,736426,738697,740077,741703,741762,743745,743892,745180
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,736426,738697,740077,741703,741762,743745,743892,745180,746902-746903

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Sun Mar  8 02:07:32 2009
@@ -42,6 +42,13 @@
     to KILLED_UNCLEAN only for relevant type of tasks.
     (Amareshwari Sriramadasu via yhemanth)
 
+    HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is
+    inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class
+    outside the TaskTracker and makes it a generic class that is used by the 
+    JobTracker also for deleting the paths on the job's output fs. (3) Moves the
+    references to completedJobStore outside the block where the JobTracker is locked.
+    (ddas)
+
 Release 0.19.1 - 2009-02-23
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/core/branches/branch-0.19/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Mar  8 02:07:32 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
-/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082,736426,738697,740077,741703,741762,743296,743745,743892,745180
+/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082,736426,738697,740077,741703,741762,743296,743745,743892,745180,746902-746903

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Sun Mar  8 02:07:32 2009
@@ -931,35 +931,39 @@
   /*
    * Return task cleanup attempt if any, to run on a given tracker
    */
-  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
                                                  boolean isMapSlot)
   throws IOException {
-    if (this.status.getRunState() != JobStatus.RUNNING || 
-        jobFailed || jobKilled) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!tasksInited.get()) {
       return null;
     }
-    TaskAttemptID taskid = null;
-    TaskInProgress tip = null;
-    if (isMapSlot) {
-      if (!mapCleanupTasks.isEmpty()) {
-        taskid = mapCleanupTasks.remove(0);
-        tip = maps[taskid.getTaskID().getId()];
+    synchronized (this) {
+      if (this.status.getRunState() != JobStatus.RUNNING || 
+          jobFailed || jobKilled) {
+        return null;
       }
-    } else {
-      if (!reduceCleanupTasks.isEmpty()) {
-        taskid = reduceCleanupTasks.remove(0);
-        tip = reduces[taskid.getTaskID().getId()];
+      String taskTracker = tts.getTrackerName();
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
       }
+      TaskAttemptID taskid = null;
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        if (!mapCleanupTasks.isEmpty()) {
+          taskid = mapCleanupTasks.remove(0);
+          tip = maps[taskid.getTaskID().getId()];
+        }
+      } else {
+        if (!reduceCleanupTasks.isEmpty()) {
+          taskid = reduceCleanupTasks.remove(0);
+          tip = reduces[taskid.getTaskID().getId()];
+        }
+      }
+      if (tip != null) {
+        return tip.addRunningTask(taskid, taskTracker, true);
+      }
+      return null;
     }
-    if (tip != null) {
-      return tip.addRunningTask(taskid, taskTracker, true);
-    }
-    return null;
   }
   
   /**
@@ -1018,9 +1022,6 @@
    * @return true/false
    */
   private synchronized boolean canLaunchJobCleanupTask() {
-    if (!tasksInited.get()) {
-      return false;
-    }
     // check if the job is running
     if (status.getRunState() != JobStatus.RUNNING &&
         status.getRunState() != JobStatus.PREP) {
@@ -2319,6 +2320,7 @@
    */
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
+    jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
       
     try {
@@ -2342,8 +2344,7 @@
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
-      FileSystem fs = tempDir.getFileSystem(conf);
-      fs.delete(tempDir, true); 
+      new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -2354,6 +2355,7 @@
     this.runningMapCache = null;
     this.nonRunningReduces = null;
     this.runningReduces = null;
+
   }
 
   /**

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Sun
Mar  8 02:07:32 2009
@@ -1567,9 +1567,6 @@
   synchronized void finalizeJob(JobInProgress job) {
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
-
-    //persists the job info in DFS
-    completedJobStatusStore.store(job);
     
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
@@ -2444,34 +2441,41 @@
     setJobPriority(jobid, newPriority);
   }
                            
-  public synchronized JobProfile getJobProfile(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getProfile();
-    } else {
-      return completedJobStatusStore.readJobProfile(jobid);
+  void storeCompletedJob(JobInProgress job) {
+    //persists the job info in DFS
+    completedJobStatusStore.store(job);
+  }
+
+  public JobProfile getJobProfile(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getProfile();
+      } 
     }
+    return completedJobStatusStore.readJobProfile(jobid);
   }
-  public synchronized JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
     }
-    
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getStatus();
-    } else {
-      return completedJobStatusStore.readJobStatus(jobid);
-    }
-  }
-  public synchronized Counters getJobCounters(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getCounters();
-    } else {
-      return completedJobStatusStore.readCounters(jobid);
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getStatus();
+      } 
+    }
+    return completedJobStatusStore.readJobStatus(jobid);
+  }
+  public Counters getJobCounters(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getCounters();
+      } 
     }
+    return completedJobStatusStore.readCounters(jobid);
   }
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
@@ -2569,18 +2573,17 @@
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events = EMPTY_EVENTS;
-
-    JobInProgress job = this.jobs.get(jobid);
-    if (null != job) {
-      if (job.inited()) {
-        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+    synchronized (this) {
+      JobInProgress job = this.jobs.get(jobid);
+      if (null != job) {
+        if (job.inited()) {
+          return job.getTaskCompletionEvents(fromEventId, maxEvents);
+        } else {
+          return EMPTY_EVENTS;
+        }
       }
     }
-    else {
-      events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
-    }
-    return events;
+    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
   }
 
   /**

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=751353&r1=751352&r2=751353&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sun Mar  8 02:07:32 2009
@@ -915,9 +915,7 @@
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue(originalConf);
-    directoryCleanupThread.setDaemon(true);
-    directoryCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue();
   }
   
   /**
@@ -1362,7 +1360,7 @@
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
@@ -2409,17 +2407,20 @@
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir));
             }  
             
             else {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
             }  
           }
@@ -2966,43 +2967,6 @@
     return paths;
   }
 
-  // cleanup queue which deletes files/directories of the paths queued up.
-  private static class CleanupQueue extends Thread {
-    private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
-    private JobConf conf;
-    
-    public CleanupQueue(JobConf conf) throws IOException{
-      setName("Directory/File cleanup thread");
-      setDaemon(true);
-      this.conf = conf;
-    }
-
-    public void addToQueue(Path... paths) {
-      for (Path p : paths) {
-        try {
-          queue.put(p);
-        } catch (InterruptedException ie) {}
-      }
-      return;
-    }
-
-    public void run() {
-      LOG.debug("cleanup thread started");
-      Path path = null;
-      while (true) {
-        try {
-          path = queue.take();
-          // delete the path.
-          FileSystem fs = path.getFileSystem(conf);
-          fs.delete(path, true);
-        } catch (IOException e) {
-          LOG.info("Error deleting path" + path);
-        } catch (InterruptedException t) {
-        }
-      }
-    }
-  }
-
   int getMaxCurrentMapTasks() {
     return maxCurrentMapTasks;
   }



Mime
View raw message