hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r588585 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/examples/pipes/conf/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/pipes/ src...
Date Fri, 26 Oct 2007 09:42:30 GMT
Author: ddas
Date: Fri Oct 26 02:42:28 2007
New Revision: 588585

URL: http://svn.apache.org/viewvc?rev=588585&view=rev
Log:
HADOOP-1857.  Ability to run a script when a task fails to capture stack traces. Contributed
by Amareshwari Sri Ramadasu

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/examples/pipes/conf/word.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Oct 26 02:42:28 2007
@@ -28,6 +28,9 @@
     jobdetails.jsp to help quickly narrow down and see categorized TIPs' 
     details via jobtasks.jsp. (Amar Kamat via acmurthy)
 
+    HADOOP-1857.  Ability to run a script when a task fails to capture stack
+    traces. (Amareshwari Sri Ramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Fri Oct 26 02:42:28 2007
@@ -64,6 +64,7 @@
   <property name="test.build.resources" value="${test.build.dir}/resources"/>
   <property name="test.build.data" value="${test.build.dir}/data"/>
   <property name="test.cache.data" value="${test.build.dir}/cache"/>
+  <property name="test.debug.data" value="${test.build.dir}/debug"/>
   <property name="test.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.testjar" value="${test.build.dir}/testjar"/>
@@ -471,10 +472,12 @@
     <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/dfs/hadoop-12-dfs-dir.tgz" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/org/apache/hadoop/dfs/hadoop-12-dfs-dir.txt" todir="${test.cache.data}"/>
+    <delete dir="${test.debug.data}"/>
+    <mkdir dir="${test.debug.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/testscript.txt" todir="${test.debug.data}"/>
     <copy todir="${test.build.resources}">
       <fileset dir="${test.resources.dir}"/>
     </copy>
-    
   </target>
 
   <!-- ================================================================== -->
@@ -492,6 +495,7 @@
       <sysproperty key="test.build.data" value="${test.build.data}"/>
       <sysproperty key="test.build.resources" value="${test.build.resources}"/>
       <sysproperty key="test.cache.data" value="${test.cache.data}"/>    	
+      <sysproperty key="test.debug.data" value="${test.debug.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
       <sysproperty key="java.library.path"
@@ -735,6 +739,9 @@
           <include name="*/bin/*" />
         </fileset>
         <fileset dir="${dist.dir}/src/contrib/ec2/bin/image"/>
+    </chmod>
+    <chmod perm="ugo+x" type="file">
+        <fileset dir="${dist.dir}/src/c++/pipes/debug"/>
     </chmod>
 
   </target>

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Fri Oct 26 02:42:28 2007
@@ -238,6 +238,8 @@
       numReduceTasksSpec_ = (String)cmdLine.getValue("-numReduceTasks"); 
       partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
       inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); 
+      mapDebugSpec_ = (String)cmdLine.getValue("-mapdebug");    
+      reduceDebugSpec_ = (String)cmdLine.getValue("-reducedebug");
       
       List<String> car = cmdLine.getValues("-cacheArchive"); 
       if (null != car){
@@ -395,6 +397,10 @@
         "Optional.", "spec",1, false );
     Option inputreader = createOption("inputreader", 
                                       "Optional.", "spec", 1, false);
+    Option mapDebug = createOption("mapdebug",
+                                   "Optional.", "spec", 1, false);
+    Option reduceDebug = createOption("reducedebug",
+                                      "Optional", "spec",1, false);
     Option cacheFile = createOption("cacheFile", 
                                     "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
     Option cacheArchive = createOption("cacheArchive", 
@@ -423,6 +429,8 @@
       withOption(partitioner).
       withOption(numReduceTasks).
       withOption(inputreader).
+      withOption(mapDebug).
+      withOption(reduceDebug).
       withOption(jobconf).
       withOption(cmdenv).
       withOption(cacheFile).
@@ -460,6 +468,10 @@
       System.out.println("  -inputreader <spec>  Optional.");
       System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a
JobConf property");
       System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming
commands");
+      System.out.println("  -mapdebug <path>  Optional. " +
+                                "To run this script when a map task fails ");
+      System.out.println("  -reducedebug <path>  Optional." +
+                             " To run this script when a reduce task fails ");
       System.out.println("  -cacheFile fileNameURI");
       System.out.println("  -cacheArchive fileNameURI");
       System.out.println("  -verbose");
@@ -801,6 +813,12 @@
       jobConf_.setNumReduceTasks(0);
     }
     
+    if(mapDebugSpec_ != null){
+    	jobConf_.setMapDebugScript(mapDebugSpec_);
+    }
+    if(reduceDebugSpec_ != null){
+    	jobConf_.setReduceDebugScript(reduceDebugSpec_);
+    }
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
 
@@ -1023,6 +1041,8 @@
   protected String partitionerSpec_;
   protected String numReduceTasksSpec_;
   protected String additionalConfSpec_;
+  protected String mapDebugSpec_;
+  protected String reduceDebugSpec_;
 
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"

Modified: lucene/hadoop/trunk/src/examples/pipes/conf/word.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/pipes/conf/word.xml?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/pipes/conf/word.xml (original)
+++ lucene/hadoop/trunk/src/examples/pipes/conf/word.xml Fri Oct 26 02:42:28 2007
@@ -8,7 +8,11 @@
 
 <property>
   <name>hadoop.pipes.executable</name>
-  <value>/examples/bin/wordcount-simple</value>
+  <value>/examples/bin/wordcount-simple#wordcount-simple</value>
+  <description> Executable path is given as <path>#<executable-name>
+                sothat the executable will have a symlink in working directory.
+                This can be used for gdb debugging etc.
+  </description>
 </property>
 
 <property>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Fri Oct
26 02:42:28 2007
@@ -52,8 +52,9 @@
  * data/text files and/or more complex types such as archives, jars etc. 
  * Archives (zip files) are un-archived at the slave nodes. Jars maybe be 
  * optionally added to the classpath of the tasks, a rudimentary software
- * distribution mechanism. Optionally users can also direct it to symlink the 
- * distributed cache file(s) into the working directory of the task.</p>
+ * distribution mechanism.  Files have execution permissions. Optionally users 
+ * can also direct it to symlink the distributed cache file(s) into 
+ * the working directory of the task.</p>
  * 
  * <p><code>DistributedCache</code> tracks modification timestamps of the
cache 
  * files. Clearly the cache files should not be modified by the application 
@@ -238,6 +239,7 @@
   }
 
   // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
   private static Path localizeCache(Configuration conf, 
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
@@ -297,6 +299,13 @@
         }
         // else will not do anyhting
         // and copy the file into the dir as it is
+      }
+      
+      // do chmod here 
+      try {
+    	FileUtil.chmod(parchive.toString(), "+x");
+      } catch(InterruptedException e) {
+    	LOG.warn("Exception in chmod" + e.toString());
       }
 
       // update cacheStatus to reflect the newly cached file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Oct 26 02:42:28
2007
@@ -83,7 +83,10 @@
  * <p>Optionally <code>JobConf</code> is used to specify other advanced
facets 
  * of the job such as <code>Comparator</code>s to be used, files to be put in
 
  * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
- * are to be compressed (and how) etc.</p>
+ * are to be compressed (and how), debugability via user-provided scripts 
+ * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
+ * for doing post-processing on task logs, task's stdout, stderr, syslog. 
+ * and etc.</p>
  * 
  * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
  * <p><blockquote><pre>
@@ -1222,6 +1225,80 @@
     return JobPriority.valueOf(prio);
   }
   
+  /**
+   * Set the debug script to run when the map tasks fail.
+   * 
+   * <p>The debug script can aid debugging of failed map tasks. The script is 
+   * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
+   * 
+   * <p>The debug command, run on the node where the map failed, is:</p>
+   * <p><pre><blockquote> 
+   * $script $stdout $stderr $syslog $jobconf.
+   * </blockquote></pre></p>
+   * 
+   * <p> The script file is distributed through {@link DistributedCache} 
+   * APIs. The script needs to be symlinked. </p>
+   * 
+   * <p>Here is an example on how to submit a script 
+   * <p><blockquote><pre>
+   * job.setMapDebugScript("./myscript");
+   * DistributedCache.createSymlink(job);
+   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
+   * </pre></blockquote></p>
+   * 
+   * @param mDbgScript the script name
+   */
+  public void  setMapDebugScript(String mDbgScript) {
+    set("mapred.map.task.debug.script", mDbgScript);
+  }
+  
+  /**
+   * Get the map task's debug script.
+   * 
+   * @return the debug Script for the mapred job for failed map tasks.
+   * @see #setMapDebugScript(String)
+   */
+  public String getMapDebugScript() {
+    return get("mapred.map.task.debug.script");
+  }
+  
+  /**
+   * Set the debug script to run when the reduce tasks fail.
+   * 
+   * <p>The debug script can aid debugging of failed reduce tasks. The script
+   * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
+   * 
+   * <p>The debug command, run on the node where the map failed, is:</p>
+   * <p><pre><blockquote> 
+   * $script $stdout $stderr $syslog $jobconf.
+   * </blockquote></pre></p>
+   * 
+   * <p> The script file is distributed through {@link DistributedCache} 
+   * APIs. The script file needs to be symlinked </p>
+   * 
+   * <p>Here is an example on how to submit a script 
+   * <p><blockquote><pre>
+   * job.setReduceDebugScript("./myscript");
+   * DistributedCache.createSymlink(job);
+   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
+   * </pre></blockquote></p>
+   * 
+   * @param rDbgScript the script name
+   */
+  public void  setReduceDebugScript(String rDbgScript) {
+    set("mapred.reduce.task.debug.script", rDbgScript);
+  }
+  
+  /**
+   * Get the reduce task's debug Script
+   * 
+   * @return the debug script for the mapred job for failed reduce tasks.
+   * @see #setReduceDebugScript(String)
+   */
+  public String getReduceDebugScript() {
+    return get("mapred.reduce.task.debug.script");
+  }
+
   /**
    * Get the uri to be invoked in-order to send a notification after the job 
    * has completed (success/failure). 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Fri Oct 26 02:42:28
2007
@@ -59,8 +59,11 @@
     STDERR ("stderr"),
     
     /** Log on the map-reduce system logs of the task. */
-    SYSLOG ("syslog");
+    SYSLOG ("syslog"),
     
+    /** Log the debug script's stdout  */
+    DEBUGOUT ("debugout");
+        
     private String prefix;
     
     private LogName(String prefix) {
@@ -248,4 +251,43 @@
     result.add(mergedCmd.toString());
     return result;
   }
+  
+  /**
+   * Wrap a command in a shell to capture debug script's 
+   * stdout and stderr to debugout.
+   * @param cmd The command and the arguments that should be run
+   * @param debugoutFilename The filename that stdout and stderr
+   *  should be saved to.
+   * @return the modified command that should be run
+   * @throws IOException
+   */
+  public static List<String> captureDebugOut(List<String> cmd, 
+                                             File debugoutFilename
+                                            ) throws IOException {
+    String debugout = FileUtil.makeShellPath(debugoutFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    mergedCmd.append("exec ");
+    boolean isExecutable = true;
+    for(String s: cmd) {
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the  
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false; 
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    mergedCmd.append(" >");
+    mergedCmd.append(debugout);
+    mergedCmd.append(" 2>&1 ");
+    result.add(mergedCmd.toString());
+    return result;
+  }
+  
 } // TaskLog

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Fri Oct 26 02:42:28
2007
@@ -58,11 +58,19 @@
         out.write("</pre></td></tr></table><hr><br>\n".getBytes());
       }
     } catch (IOException ioe) {
-      response.sendError(HttpServletResponse.SC_GONE,
+      if (filter == TaskLog.LogName.DEBUGOUT) {
+        if (!plainText) {
+           out.write("</pre></td></tr></table><hr><br>\n".getBytes());
+         }
+        // do nothing
+      }
+      else {
+        response.sendError(HttpServletResponse.SC_GONE,
                          "Failed to retrieve " + filter + " log for task: " + 
                          taskId);
-      out.write(("TaskLogServlet exception:\n" + 
+        out.write(("TaskLogServlet exception:\n" + 
                  StringUtils.stringifyException(ioe) + "\n").getBytes());
+      }
     }
   }
 
@@ -124,6 +132,8 @@
                      TaskLog.LogName.STDERR);
         printTaskLog(response, out, taskId, start, end, plainText, 
                      TaskLog.LogName.SYSLOG);
+        printTaskLog(response, out, taskId, start, end, plainText, 
+                TaskLog.LogName.DEBUGOUT);
       } else {
         printTaskLog(response, out, taskId, start, end, plainText, filter);
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Oct 26 02:42:28
2007
@@ -418,12 +418,12 @@
    */
   private void runChild(List<String> args, File dir,
                         String taskid) throws IOException {
-    ProcessBuilder builder = new ProcessBuilder(args);
-    builder.directory(dir);
-    process = builder.start();
-    
+
     try {
-      int exit_code = process.waitFor();
+      ShellUtil shexec = new ShellUtil(args, dir, System.getenv());
+      shexec.execute();
+      process = shexec.getProcess();
+      int exit_code = shexec.getExitCode();
      
       if (!killed && exit_code != 0) {
         if (exit_code == 65) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Oct 26 02:42:28
2007
@@ -19,11 +19,15 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.io.RandomAccessFile;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException; 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,6 +40,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
+import java.util.Vector;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -52,10 +57,12 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -68,6 +75,7 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ShellUtil;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.log4j.LogManager;
 
@@ -872,14 +880,16 @@
     // Check if we should ask for a new Task
     //
     boolean askForNewTask;
+    long localMinSpaceStart;
     synchronized (this) {
       askForNewTask = (mapTotal < maxCurrentTasks || 
                        reduceTotal < maxCurrentTasks) &&
                       acceptNewTasks; 
+      localMinSpaceStart = minSpaceStart;
     }
     if (askForNewTask) {
       checkLocalDirs(fConf.getLocalDirs());
-      askForNewTask = enoughFreeSpace(minSpaceStart);
+      askForNewTask = enoughFreeSpace(localMinSpaceStart);
     }
       
     //
@@ -1039,7 +1049,11 @@
    * Then pick the one with least progress
    */
   private void killOverflowingTasks() throws IOException {
-    if (!enoughFreeSpace(minSpaceKill)) {
+    long localMinSpaceKill;
+    synchronized(this){
+      localMinSpaceKill = minSpaceKill;  
+    }
+    if (!enoughFreeSpace(localMinSpaceKill)) {
       acceptNewTasks=false; 
       //we give up! do not accept new tasks until
       //all the ones running have finished and they're all cleared up
@@ -1229,6 +1243,7 @@
     private boolean alwaysKeepTaskFiles;
     private TaskStatus taskStatus; 
     private long taskTimeout;
+    private String debugCommand;
         
     /**
      */
@@ -1279,6 +1294,11 @@
       } else {
         alwaysKeepTaskFiles = false;
       }
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {
+        debugCommand = localJobConf.getReduceDebugScript();
+      }
     }
         
     /**
@@ -1287,14 +1307,14 @@
       return task;
     }
 
-    public void setJobConf(JobConf lconf){
+    public synchronized void setJobConf(JobConf lconf){
       this.localJobConf = lconf;
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
       taskTimeout = localJobConf.getLong("mapred.task.timeout", 
                                          10 * 60 * 1000);
     }
         
-    public JobConf getJobConf() {
+    public synchronized JobConf getJobConf() {
       return localJobConf;
     }
         
@@ -1413,6 +1433,71 @@
           if (!wasKilled) {
             failures += 1;
             taskStatus.setRunState(TaskStatus.State.FAILED);
+            // call the script here for the failed tasks.
+            if (debugCommand != null) {
+              String taskStdout ="";
+              String taskStderr ="";
+              String taskSyslog ="";
+              String jobConf = task.getJobFile();
+              try {
+                // get task's stdout file 
+                taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                                  (task.getTaskId(), TaskLog.LogName.STDOUT));
+                // get task's stderr file 
+                taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                                  (task.getTaskId(), TaskLog.LogName.STDERR));
+                // get task's syslog file 
+                taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                                  (task.getTaskId(), TaskLog.LogName.SYSLOG));
+              } catch(IOException e){
+                LOG.warn("Exception finding task's stdout/err/syslog files");
+              }
+              File workDir = new File(task.getJobFile()).getParentFile();
+              // Build the command  
+              File stdout = TaskLog.getTaskLogFile(task.getTaskId(),
+                                                   TaskLog.LogName.DEBUGOUT);
+              // add pipes program as argument if it exists.
+              String program ="";
+              String executable = Submitter.getExecutable(localJobConf);
+              if ( executable != null) {
+            	try {
+            	  program = new URI(executable).getFragment();
+            	} catch (URISyntaxException ur) {
+            	  LOG.warn("Problem in the URI fragment for pipes executable");
+            	}	  
+              }
+              String [] debug = debugCommand.split(" ");
+              Vector<String> vargs = new Vector<String>();
+              for (String component : debug) {
+                vargs.add(component);
+              }
+              vargs.add(taskStdout);
+              vargs.add(taskStderr);
+              vargs.add(taskSyslog);
+              vargs.add(jobConf);
+              vargs.add(program);
+              try {
+                List<String>  wrappedCommand = TaskLog.captureDebugOut
+                                                          (vargs, stdout);
+                // run the script.
+                try {
+                  runScript(wrappedCommand, workDir);
+                } catch (IOException ioe) {
+                  LOG.warn("runScript failed with: " + StringUtils.
+                                                      stringifyException(ioe));
+                }
+              } catch(IOException e) {
+                LOG.warn("Error in preparing wrapped debug command");
+              }
+
+              // add all lines of debug out to diagnostics
+              try {
+                int num = localJobConf.getInt("mapred.debug.out.lines", -1);
+                addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
+              } catch(IOException ioe) {
+                LOG.warn("Exception in add diagnostics!");
+              }
+            }
           } else {
             taskStatus.setRunState(TaskStatus.State.KILLED);
           }
@@ -1436,8 +1521,96 @@
         }
       }
     }
+  
 
     /**
+     * Runs the script given in args
+     * @param args script name followed by its argumnets
+     * @param dir current working directory.
+     * @throws IOException
+     */
+    public void runScript(List<String> args, File dir) throws IOException {
+      Process process = null;
+      try {
+        ShellUtil shexec = new ShellUtil(args, dir, System.getenv());
+        shexec.execute();
+        process = shexec.getProcess();
+        int exit_code = shexec.getExitCode();
+        if (exit_code != 0) {
+          throw new IOException("Task debug script exit with nonzero "
+                                +"status of " + exit_code + ".");
+        }
+      } catch (InterruptedException e) {
+          throw new IOException(e.toString());
+      } finally {
+        if (process != null) {
+          process.destroy();
+        }
+      }
+    }
+
+    /**
+     * Add last 'num' lines of the given file to the diagnostics.
+     * if num =-1, all the lines of file are added to the diagnostics.
+     * @param file The file from which to collect diagnostics.
+     * @param num The number of lines to be sent to diagnostics.
+     * @param tag The tag is printed before the diagnostics are printed. 
+     */
+    public void addDiagnostics(String file, int num, String tag) {
+      RandomAccessFile rafile = null;
+      try {
+        rafile = new RandomAccessFile(file,"r");
+        int no_lines =0;
+        String line = null;
+        StringBuffer tail = new StringBuffer();
+        tail.append("\n-------------------- "+tag+"---------------------\n");
+        String[] lines = null;
+        if (num >0) {
+          lines = new String[num];
+        }
+        while ((line = rafile.readLine()) != null) {
+          no_lines++;
+          if (num >0) {
+            if (no_lines <= num) {
+              lines[no_lines-1] = line;
+            }
+            else { // shift them up
+              for (int i=0; i<num-1; ++i) {
+                lines[i] = lines[i+1];
+              }
+              lines[num-1] = line;
+            }
+          }
+          else if (num == -1) {
+            tail.append(line); 
+            tail.append("\n");
+          }
+        }
+        int n = no_lines > num ?num:no_lines;
+        if (num >0) {
+          for (int i=0;i<n;i++) {
+            tail.append(lines[i]);
+            tail.append("\n");
+          }
+        }
+        if(n!=0)
+          reportDiagnosticInfo(tail.toString());
+      } catch (FileNotFoundException fnfe){
+        LOG.warn("File "+file+ " not found");
+      } catch (IOException ioe){
+        LOG.warn("Error reading file "+file);
+      } finally {
+         try {
+           if (rafile != null) {
+             rafile.close();
+           }
+         } catch (IOException ioe) {
+           LOG.warn("Error closing file "+file);
+         }
+      }
+    }
+    
+    /**
      * We no longer need anything from this task, as the job has
      * finished.  If the task is still running, kill it and clean up.
      * 
@@ -1515,10 +1688,12 @@
       LOG.debug("Cleaning up " + taskId);
       synchronized (TaskTracker.this) {
         tasks.remove(taskId);
-        if (alwaysKeepTaskFiles ||
-            (taskStatus.getRunState() == TaskStatus.State.FAILED && 
-             keepFailedTaskFiles)) {
-          return;
+        synchronized (this){
+          if (alwaysKeepTaskFiles ||
+              (taskStatus.getRunState() == TaskStatus.State.FAILED && 
+               keepFailedTaskFiles)) {
+            return;
+          }
         }
       }
       synchronized (this) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Fri Oct 26
02:42:28 2007
@@ -231,6 +231,15 @@
     if (exec == null) {
       throw new IllegalArgumentException("No application program defined.");
     }
+    // add default debug script only when executable is expressed as
+    // <path>#<executable>
+    if (exec.contains("#")) {
+      DistributedCache.createSymlink(conf);
+      // set default gdb commands for map and reduce task 
+      String defScript = "$HADOOP_HOME/src/c++/pipes/debug/pipes-default-script";
+      setIfUnset(conf,"mapred.map.task.debug.script",defScript);
+      setIfUnset(conf,"mapred.reduce.task.debug.script",defScript);
+    }
     URI[] fileCache = DistributedCache.getCacheFiles(conf);
     if (fileCache == null) {
       fileCache = new URI[1];

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?rev=588585&r1=588584&r2=588585&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Fri Oct 26 02:42:28
2007
@@ -161,6 +161,9 @@
    * @param uris
    */
   public static String uriToString(URI[] uris){
+    if (uris == null) {
+      return null;
+    }
     StringBuffer ret = new StringBuffer(uris[0].toString());
     for(int i = 1; i < uris.length;i++){
       ret.append(",");
@@ -194,6 +197,9 @@
    * @param str
    */
   public static Path[] stringToPath(String[] str){
+    if (str == null) {
+      return null;
+    }
     Path[] p = new Path[str.length];
     for (int i = 0; i < str.length;i++){
       p[i] = new Path(str[i]);



Mime
View raw message