hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r669547 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Date Thu, 19 Jun 2008 16:37:51 GMT
Author: ddas
Date: Thu Jun 19 09:37:51 2008
New Revision: 669547

URL: http://svn.apache.org/viewvc?rev=669547&view=rev
Log:
HADOOP-3546. TaskTracker re-initialization gets stuck in cleaning up. Contributed by Amareshwari
Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=669547&r1=669546&r2=669547&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun 19 09:37:51 2008
@@ -628,6 +628,9 @@
     HADOOP-3534. Log IOExceptions that happen in closing the name
     system when the NameNode shuts down. (Tsz Wo (Nicholas) Sze via omalley)
 
+    HADOOP-3546. TaskTracker re-initialization gets stuck in cleaning up.
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=669547&r1=669546&r2=669547&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Jun 19 09:37:51
2008
@@ -297,9 +297,6 @@
         public void run() {
           while (true) {
             try {
-              if (tasksToCleanup.isEmpty() && !isRunning()) {
-                break;
-              }
               TaskTrackerAction action = tasksToCleanup.take();
               if (action instanceof KillJobAction) {
                 purgeJob((KillJobAction) action);
@@ -322,10 +319,6 @@
           }
         }
       }, "taskCleanup");
-  {
-    taskCleanupThread.setDaemon(true);
-    taskCleanupThread.start();
-  }
     
   private RunningJob addTaskToJob(JobID jobId, 
                                   Path localJobFile,
@@ -398,12 +391,9 @@
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
     }
  
-    directoryCleanupThread = new CleanupQueue(fConf);
-    directoryCleanupThread.start();
-
     //check local disk
     checkLocalDirs(this.fConf.getLocalDirs());
-    directoryCleanupThread.addToQueue(getLocalFiles(fConf, SUBDIR));
+    fConf.deleteLocalFiles(SUBDIR);
 
     // Clear out state tables
     this.tasks.clear();
@@ -458,7 +448,6 @@
                        InterTrackerProtocol.versionID, 
                        jobTrackAddr, this.fConf);
         
-    this.running = true;
     // start the thread that will fetch map task completion events
     this.mapEventsFetcher = new MapEventsFetcherThread();
     mapEventsFetcher.setDaemon(true);
@@ -801,28 +790,6 @@
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     
-    // shutdown cleanup threads.
-    if (this.taskCleanupThread != null 
-         && this.taskCleanupThread.isAlive()) {
-      LOG.info("Stopping task cleanup thread");
-      this.taskCleanupThread.interrupt();
-      try {
-        this.taskCleanupThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.directoryCleanupThread != null 
-         && this.directoryCleanupThread.isAlive()) {
-      LOG.info("Stopping directory cleanup thread");
-      this.directoryCleanupThread.interrupt();
-      try {
-        this.directoryCleanupThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
   }
@@ -867,6 +834,14 @@
     initialize();
   }
 
+  private void startCleanupThreads() throws IOException {
+    taskCleanupThread.setDaemon(true);
+    taskCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue(originalConf);
+    directoryCleanupThread.setDaemon(true);
+    directoryCleanupThread.start();
+  }
+  
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -1355,6 +1330,8 @@
    */
   public void run() {
     try {
+      startCleanupThreads();
+      this.running = true;
       boolean denied = false;
       while (running && !shuttingDown && !denied) {
         boolean staleState = false;
@@ -2347,14 +2324,6 @@
   }
 
   /**
-   * True if task tracker is not shutting down.
-   * @return running
-   */
-  public boolean isRunning() {
-    return !shuttingDown;
-  }
-  
-  /**
    * This class is used in TaskTracker's Jetty to serve the map outputs
    * to other nodes.
    */
@@ -2510,7 +2479,7 @@
   }
 
   // cleanup queue which deletes files/directories of the paths queued up.
-  private class CleanupQueue extends Thread {
+  private static class CleanupQueue extends Thread {
     private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
     private JobConf conf;
     
@@ -2534,9 +2503,6 @@
       Path path = null;
       while (true) {
         try {
-          if (queue.isEmpty() && !isRunning()) {
-            break;
-          }
           path = queue.take();
           // delete the path.
           FileSystem fs = path.getFileSystem(conf);



Mime
View raw message