hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r543222 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskRunner.java
Date Thu, 31 May 2007 19:14:58 GMT
Author: cutting
Date: Thu May 31 12:14:57 2007
New Revision: 543222

URL: http://svn.apache.org/viewvc?view=rev&rev=543222
Log:
HADOOP-1332.  Fix so that TaskTracker exits reliably during unit tests on Windows.  Contributed
by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=543222&r1=543221&r2=543222
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu May 31 12:14:57 2007
@@ -508,6 +508,9 @@
 129. HADOOP-1242.  Improve handling of DFS upgrades.
      (Konstantin Shvachko via cutting)
 
+130. HADOOP-1332.  Fix so that TaskTracker exits reliably during unit
+     tests on Windows.  (omalley via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=543222&r1=543221&r2=543222
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu May 31 12:14:57
2007
@@ -389,16 +389,19 @@
    */
   private void runChild(String[] args, File dir) throws IOException {
     this.process = Runtime.getRuntime().exec(args, null, dir);
+    
+    Thread logStdErrThread = null;
+    Thread logStdOutThread = null;
     try {
-      new Thread() {
-        public void run() {
-          // Copy stderr of the process
-          logStream(process.getErrorStream(), taskStdErrLogWriter); 
-        }
-      }.start();
-        
-      // Copy stderr of the process; normally empty
-      logStream(process.getInputStream(), taskStdOutLogWriter);		  
+      // Copy stderr of the child-process via a thread
+      logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"), 
+                                   process.getErrorStream(), 
+                                   taskStdErrLogWriter);
+      
+      // Copy stdout of the child-process via a thread
+      logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"), 
+                                  process.getInputStream(), 
+                                  taskStdOutLogWriter); 
       
       int exit_code = process.waitFor();
      
@@ -411,8 +414,21 @@
       throw new IOException(e.toString());
     } finally {
       kill();
-      taskStdOutLogWriter.close();
-      taskStdErrLogWriter.close();
+      
+      // Kill both stdout/stderr copying threads 
+      if (logStdErrThread != null) {
+        logStdErrThread.interrupt();
+        try {
+          logStdErrThread.join();
+        } catch (InterruptedException ie) {}
+      }
+      
+      if (logStdOutThread != null) {
+        logStdOutThread.interrupt();
+        try {
+          logStdOutThread.join();
+        } catch (InterruptedException ie) {}
+      }
     }
   }
 
@@ -427,24 +443,47 @@
   }
 
   /**
+   * Spawn a new thread to copy the child-jvm's stdout/stderr streams
+   * via a {@link TaskLog.Writer}
+   * 
+   * @param threadName thread name
+   * @param stream child-jvm's stdout/stderr stream
+   * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data
+   * @return Return the newly created thread
    */
-  private void logStream(InputStream output, TaskLog.Writer taskLog) {
-    try {
-      byte[] buf = new byte[512];
-      int n = 0;
-      while ((n = output.read(buf, 0, buf.length)) != -1) {
-        // Write out to the task's log
-        taskLog.write(buf, 0, n);
-      }
-    } catch (IOException e) {
-      LOG.warn(t.getTaskId()+" Error reading child output", e);
-    } finally {
-      try {
-        output.close();
-      } catch (IOException e) {
-        LOG.warn(t.getTaskId()+" Error closing child output", e);
+  private Thread logStream(String threadName, 
+                           final InputStream stream, 
+                           final TaskLog.Writer taskLog) {
+    Thread loggerThread = new Thread() {
+      public void run() {
+        try {
+          byte[] buf = new byte[512];
+          while (!Thread.interrupted()) {
+            while (stream.available() > 0) {
+              int n = stream.read(buf, 0, buf.length);
+              taskLog.write(buf, 0, n);
+            }
+            Thread.sleep(1000);
+          }
+        } catch (IOException e) {
+          LOG.warn(t.getTaskId()+" Error reading child output", e);
+        } catch (InterruptedException e) {
+          // expected
+        } finally {
+          try {
+            stream.close();
+            taskLog.close();
+          } catch (IOException e) {
+            LOG.warn(t.getTaskId()+" Error closing child output", e);
+          }
+        }
       }
-    }
+    };
+    loggerThread.setName(threadName);
+    loggerThread.setDaemon(true);
+    loggerThread.start();
+    
+    return loggerThread;
   }
   
 }



Mime
View raw message