hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r887061 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/util/ src/test/mapred/org/apache/hadoop/mapred/
Date Fri, 04 Dec 2009 05:06:36 GMT
Author: tomwhite
Date: Fri Dec  4 05:06:34 2009
New Revision: 887061

URL: http://svn.apache.org/viewvc?rev=887061&view=rev
Log:
MAPREDUCE-1119. When tasks fail to report status, show tasks's stack dump before killing.
Contributed by Aaron Kimball.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in
    hadoop/mapreduce/trunk/src/c++/task-controller/main.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec  4 05:06:34 2009
@@ -57,6 +57,9 @@
     MAPREDUCE-1190. Add package documentation for BBP example.
     (Tsz Wo (Nicholas) Sze via cdouglas)
 
+    MAPREDUCE-1119. When tasks fail to report status, show tasks's stack dump
+    before killing. (Aaron Kimball via tomwhite)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/Makefile.in?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in Fri Dec  4 05:06:34 2009
@@ -26,22 +26,22 @@
 TESTBINARY=${testdir}/test-task-controller
 
 all: $(OBJS)
-	$(CC) $(CFLAG) -o $(BINARY) $(OBJS)
+	$(CC) $(CFLAGS) -o $(BINARY) $(OBJS)
 
 main.o: main.c task-controller.h
-	$(CC) $(CFLAG) -o main.o -c main.c
+	$(CC) $(CFLAGS) -o main.o -c main.c
 
 task-controller.o: task-controller.c task-controller.h 
-	$(CC) $(CFLAG) -o task-controller.o -c task-controller.c
+	$(CC) $(CFLAGS) -o task-controller.o -c task-controller.c
 
 configuration.o: configuration.h configuration.c
-	$(CC) $(CFLAG) -o configuration.o -c configuration.c
+	$(CC) $(CFLAGS) -o configuration.o -c configuration.c
 
 ${testdir}/test-task-controller.o: task-controller.c task-controller.h
-	$(CC) $(CFLAG) -o ${testdir}/test-task-controller.o -c ${testdir}/test-task-controller.c
+	$(CC) $(CFLAGS) -o ${testdir}/test-task-controller.o -c ${testdir}/test-task-controller.c
 
 test: $(TESTOBJS)
-	$(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS)
+	$(CC) $(CFLAGS) -o $(TESTBINARY) $(TESTOBJS)
 
 clean:
 	rm -rf $(BINARY) $(OBJS) $(TESTOBJS)

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Fri Dec  4 05:06:34 2009
@@ -58,6 +58,7 @@
       NULL, 0 } };
 
   const char* log_file = NULL;
+  int conf_dir_len = 0;
 
   //Minimum number of arguments required to run the task-controller
   //command-name user command tt-root
@@ -67,10 +68,17 @@
   }
 
 #ifndef HADOOP_CONF_DIR
-  hadoop_conf_dir = (char *) malloc (sizeof(char) *
-      (strlen(argv[0]) - strlen(EXEC_PATTERN)) + 1);
-  strncpy(hadoop_conf_dir,argv[0],(strlen(argv[0]) - strlen(EXEC_PATTERN)));
-  hadoop_conf_dir[(strlen(argv[0]) - strlen(EXEC_PATTERN))] = '\0';
+  conf_dir_len = (strlen(argv[0]) - strlen(EXEC_PATTERN)) + 1;
+  if (conf_dir_len < 1) {
+    // We didn't get an absolute path to our argv[0]; bail.
+    printf("Cannot find configuration directory.\n");
+    printf("This program must be run with its full absolute path.\n");
+    return INVALID_CONF_DIR;
+  } else {
+    hadoop_conf_dir = (char *) malloc (sizeof(char) * conf_dir_len);
+    strncpy(hadoop_conf_dir,argv[0],(strlen(argv[0]) - strlen(EXEC_PATTERN)));
+    hadoop_conf_dir[(strlen(argv[0]) - strlen(EXEC_PATTERN))] = '\0';
+  }
 #endif
   do {
     next_option = getopt_long(argc, argv, short_options, long_options, NULL);
@@ -142,6 +150,10 @@
     exit_code
         = run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
     break;
+  case SIGQUIT_TASK_JVM:
+    task_pid = argv[optind++];
+    exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGQUIT);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Fri Dec  4 05:06:34 2009
@@ -35,6 +35,7 @@
   }
 
   if(initgroups(user_detail->pw_name, user_detail->pw_gid) != 0) {
+    fprintf(LOGFILE, "unable to initgroups : %s\n", strerror(errno));
 	  cleanup();
 	  return SETUID_OPER_FAILED;
   }
@@ -1029,7 +1030,8 @@
   return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT);
 }
 /**
- * Function used to terminate/kill a task launched by the user.
+ * Function used to terminate/kill a task launched by the user,
+ * or dump the process' stack (by sending SIGQUIT).
  * The function sends appropriate signal to the process group
  * specified by the task_pid.
  */

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Fri Dec  4 05:06:34 2009
@@ -45,6 +45,7 @@
   TERMINATE_TASK_JVM,
   KILL_TASK_JVM,
   RUN_DEBUG_SCRIPT,
+  SIGQUIT_TASK_JVM,
 };
 
 enum errorcodes {
@@ -69,6 +70,7 @@
   INITIALIZE_DISTCACHE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
   UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
+  INVALID_CONF_DIR, //22
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Fri
Dec  4 05:06:34 2009
@@ -119,14 +119,14 @@
     if (shexec != null) {
       if (Shell.WINDOWS) {
         //We don't do send kill process signal in case of windows as 
-        //already we have done a process.destroy() in termintateTaskJVM()
+        //already we have done a process.destroy() in terminateTaskJVM()
         return;
       }
       String pid = context.pid;
       if (pid != null) {
         if(ProcessTree.isSetsidAvailable) {
           ProcessTree.killProcessGroup(pid);
-        }else {
+        } else {
           ProcessTree.killProcess(pid);
         }
       }
@@ -134,6 +134,26 @@
   }
 
   @Override
+  void dumpTaskStack(TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    if (shexec != null) {
+      if (Shell.WINDOWS) {
+        // We don't use signals in Windows.
+        return;
+      }
+      String pid = context.pid;
+      if (pid != null) {
+        // Send SIGQUIT to get a stack dump
+        if (ProcessTree.isSetsidAvailable) {
+          ProcessTree.sigQuitProcessGroup(pid);
+        } else {
+          ProcessTree.sigQuitProcess(pid);
+        }
+      }
+    }
+  }
+
+  @Override
   public void initializeDistributedCache(InitializationContext context) {
     // Do nothing.
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java Fri Dec  4 05:06:34
2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 
 class JvmManager {
@@ -136,6 +137,14 @@
     }
   }
 
+  void dumpStack(TaskRunner tr) {
+    if (tr.getTask().isMapTask()) {
+      mapJvmManager.dumpStack(tr);
+    } else {
+      reduceJvmManager.dumpStack(tr);
+    }
+  }
+
   public void killJvm(JVMId jvmId) {
     if (jvmId.isMap) {
       mapJvmManager.killJvm(jvmId);
@@ -243,6 +252,16 @@
       }
     }
     
+    synchronized void dumpStack(TaskRunner tr) {
+      JVMId jvmId = runningTaskToJvm.get(tr);
+      if (null != jvmId) {
+        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+        if (null != jvmRunner) {
+          jvmRunner.dumpChildStacks();
+        }
+      }
+    }
+
     synchronized public void stop() {
       //since the kill() method invoked later on would remove
       //an entry from the jvmIdToRunner map, we create a
@@ -459,7 +478,38 @@
           removeJvm(jvmId);
         }
       }
-      
+
+      /** Send a signal to the JVM requesting that it dump a stack trace,
+       * and wait for a timeout interval to give this signal time to be
+       * processed.
+       */
+      void dumpChildStacks() {
+        if (!killed) {
+          TaskController controller = tracker.getTaskController();
+          // Check inital context before issuing a signal to prevent situations
+          // where signal is issued before task is launched.
+          if (initalContext != null && initalContext.env != null) {
+            initalContext.pid = jvmIdToPid.get(jvmId);
+            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
+                .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
+                    ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            // signal the task jvm
+            controller.dumpTaskStack(initalContext);
+
+            // We're going to kill the jvm with SIGKILL after this,
+            // so we should wait for a few seconds first to ensure that
+            // the SIGQUIT has time to be processed.
+            try {
+              Thread.sleep(initalContext.sleeptimeBeforeSigkill);
+            } catch (InterruptedException e) {
+              LOG.warn("Sleep interrupted : " +
+                  StringUtils.stringifyException(e));
+            }
+          }
+        }
+      }
+
       public void taskRan() {
         busy = false;
         numTasksRan++;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Fri
Dec  4 05:06:34 2009
@@ -89,6 +89,7 @@
     TERMINATE_TASK_JVM,
     KILL_TASK_JVM,
     RUN_DEBUG_SCRIPT,
+    SIGQUIT_TASK_JVM,
   }
 
   /**
@@ -443,16 +444,16 @@
   }
   
   /**
-   * Convenience method used to sending appropriate Kill signal to the task 
+   * Convenience method used to sending appropriate signal to the task
    * VM
    * @param context
    * @param command
    * @throws IOException
    */
-  private void finishTask(TaskControllerContext context,
+  protected void signalTask(TaskControllerContext context,
       TaskCommands command) throws IOException{
     if(context.task == null) {
-      LOG.info("Context task null not killing the JVM");
+      LOG.info("Context task is null; not signaling the JVM");
       return;
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
@@ -469,7 +470,7 @@
   @Override
   void terminateTask(TaskControllerContext context) {
     try {
-      finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+      signalTask(context, TaskCommands.TERMINATE_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending kill to the Task VM " + 
           StringUtils.stringifyException(e));
@@ -479,13 +480,23 @@
   @Override
   void killTask(TaskControllerContext context) {
     try {
-      finishTask(context, TaskCommands.KILL_TASK_JVM);
+      signalTask(context, TaskCommands.KILL_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending destroy to the Task VM " + 
           StringUtils.stringifyException(e));
     }
   }
 
+  @Override
+  void dumpTaskStack(TaskControllerContext context) {
+    try {
+      signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
+          StringUtils.stringifyException(e));
+    }
+  }
+
   protected String getTaskControllerExecutablePath() {
     return taskControllerExe;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Fri Dec 
4 05:06:34 2009
@@ -134,26 +134,26 @@
 
   /**
    * Top level cleanup a task JVM method.
-   *
-   * The current implementation does the following.
    * <ol>
-   * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
+   * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
    * to cleanup.</li>
-   * <li>Waits for stipulated period</li>
    * <li>Sends a forceful kill signal to task JVM, terminating all its
-   * sub-process forcefully.</li>
+   * sub-processes forcefully.</li>
    * </ol>
-   * 
+   *
    * @param context the task for which kill signal has to be sent.
    */
   final void destroyTaskJVM(TaskControllerContext context) {
+    // Send SIGTERM to try to ask for a polite exit.
     terminateTask(context);
+
     try {
       Thread.sleep(context.sleeptimeBeforeSigkill);
     } catch (InterruptedException e) {
-      LOG.warn("Sleep interrupted : " + 
+      LOG.warn("Sleep interrupted : " +
           StringUtils.stringifyException(e));
     }
+
     killTask(context);
   }
 
@@ -224,6 +224,15 @@
    */
   abstract void killTask(TaskControllerContext context);
 
+
+  /**
+   * Sends a QUIT signal to direct the task JVM (and sub-processes) to
+   * dump their stack to stdout.
+   *
+   * @param context task context.
+   */
+  abstract void dumpTaskStack(TaskControllerContext context);
+
   /**
    * Initialize user on this TaskTracer in a TaskController specific manner.
    * 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Dec  4 05:06:34
2009
@@ -93,6 +93,8 @@
   public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
   public TaskTracker getTracker() { return tracker; }
 
+  public JvmManager getJvmManager() { return jvmManager; }
+
   /** Called to assemble this task's input.  This method is run in the parent
    * process before the child is spawned.  It should not execute user code,
    * only system code. */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec  4 05:06:34
2009
@@ -1509,6 +1509,7 @@
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
           myInstrumentation.timedoutTask(tip.getTask().getTaskID());
+          dumpTaskStack(tip);
           purgeTask(tip, true);
         }
       }
@@ -1516,6 +1517,22 @@
   }
 
   /**
+   * Send a signal to a stuck task commanding it to dump stack traces
+   * to stderr before we kill it with purgeTask().
+   *
+   * @param tip {@link TaskInProgress} to dump stack traces.
+   */
+  private void dumpTaskStack(TaskInProgress tip) {
+    TaskRunner runner = tip.getTaskRunner();
+    if (null == runner) {
+      return; // tip is already abandoned.
+    }
+
+    JvmManager jvmMgr = runner.getJvmManager();
+    jvmMgr.dumpStack(runner);
+  }
+
+  /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
    * @throws IOException

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java Fri
Dec  4 05:06:34 2009
@@ -37,6 +37,15 @@
 
   public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
 
+  private static final int SIGQUIT = 3;
+  private static final int SIGTERM = 15;
+  private static final int SIGKILL = 9;
+
+  private static final String SIGQUIT_STR = "SIGQUIT";
+  private static final String SIGTERM_STR = "SIGTERM";
+  private static final String SIGKILL_STR = "SIGKILL";
+
+
   public static final boolean isSetsidAvailable = isSetsidSupported();
   private static boolean isSetsidSupported() {
     ShellCommandExecutor shexec = null;
@@ -102,43 +111,78 @@
     sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
   }
 
+
   /**
-   * Sends terminate signal to the process, allowing it to gracefully exit.
-   * 
-   * @param pid pid of the process to be sent SIGTERM
+   * Send a specified signal to the specified pid
+   *
+   * @param pid the pid of the process [group] to signal.
+   * @param signalNum the signal to send.
+   * @param signalName the human-readable description of the signal
+   * (for logging).
    */
-  public static void terminateProcess(String pid) {
+  private static void sendSignal(String pid, int signalNum, String signalName) {
     ShellCommandExecutor shexec = null;
     try {
-      String[] args = { "kill", pid };
+      String[] args = { "kill", "-" + signalNum, pid };
       shexec = new ShellCommandExecutor(args);
       shexec.execute();
     } catch (IOException ioe) {
       LOG.warn("Error executing shell command " + ioe);
     } finally {
-      LOG.info("Killing process " + pid +
-               " with SIGTERM. Exit code " + shexec.getExitCode());
+      if (pid.startsWith("-")) {
+        LOG.info("Sending signal to all members of process group " + pid
+            + ": " + signalName + ". Exit code " + shexec.getExitCode());
+      } else {
+        LOG.info("Signaling process " + pid
+            + " with " + signalName + ". Exit code " + shexec.getExitCode());
+      }
     }
   }
 
   /**
+   * Send a specified signal to the process, if it is alive.
+   *
+   * @param pid the pid of the process to signal.
+   * @param signalNum the signal to send.
+   * @param signalName the human-readable description of the signal
+   * (for logging).
+   * @param alwaysSignal if true then send signal even if isAlive(pid) is false
+   */
+  private static void maybeSignalProcess(String pid, int signalNum,
+      String signalName, boolean alwaysSignal) {
+    // If process tree is not alive then don't signal, unless alwaysSignal
+    // forces it so.
+    if (alwaysSignal || ProcessTree.isAlive(pid)) {
+      sendSignal(pid, signalNum, signalName);
+    }
+  }
+
+  private static void maybeSignalProcessGroup(String pgrpId, int signalNum,
+      String signalName, boolean alwaysSignal) {
+
+    if (alwaysSignal || ProcessTree.isProcessGroupAlive(pgrpId)) {
+      // signaling a process group means using a negative pid.
+      sendSignal("-" + pgrpId, signalNum, signalName);
+    }
+  }
+
+  /**
+   * Sends terminate signal to the process, allowing it to gracefully exit.
+   * 
+   * @param pid pid of the process to be sent SIGTERM
+   */
+  public static void terminateProcess(String pid) {
+    maybeSignalProcess(pid, SIGTERM, SIGTERM_STR, true);
+  }
+
+  /**
    * Sends terminate signal to all the process belonging to the passed process
    * group, allowing the group to gracefully exit.
    * 
    * @param pgrpId process group id
    */
   public static void terminateProcessGroup(String pgrpId) {
-    ShellCommandExecutor shexec = null;
-    try {
-      String[] args = { "kill", "--", "-" + pgrpId };
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (IOException ioe) {
-      LOG.warn("Error executing shell command " + ioe);
-    } finally {
-      LOG.info("Killing all processes in the process group " + pgrpId +
-               " with SIGTERM. Exit code " + shexec.getExitCode());
-    }
+    maybeSignalProcessGroup(pgrpId, SIGTERM, SIGTERM_STR, true);
   }
 
   /**
@@ -197,22 +241,17 @@
    * @param pid process id
    */
   public static void killProcess(String pid) {
+    maybeSignalProcess(pid, SIGKILL, SIGKILL_STR, false);
+  }
 
-    //If process tree is not alive then return immediately.
-    if(!ProcessTree.isAlive(pid)) {
-      return;
-    }
-    String[] args = { "kill", "-9", pid };
-    ShellCommandExecutor shexec = new ShellCommandExecutor(args);
-    try {
-      shexec.execute();
-    } catch (IOException e) {
-      LOG.warn("Error sending SIGKILL to process "+ pid + " ."+ 
-          StringUtils.stringifyException(e));
-    } finally {
-      LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
-          + shexec.getExitCode());
-    }
+  /**
+   * Sends SIGQUIT to process; Java programs will dump their stack to
+   * stdout.
+   *
+   * @param pid process id
+   */
+  public static void sigQuitProcess(String pid) {
+    maybeSignalProcess(pid, SIGQUIT, SIGQUIT_STR, false);
   }
 
   /**
@@ -222,25 +261,20 @@
    * @param pgrpId process group id
    */
   public static void killProcessGroup(String pgrpId) {
+    maybeSignalProcessGroup(pgrpId, SIGKILL, SIGKILL_STR, false);
+  }
 
-    //If process tree is not alive then return immediately.
-    if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
-      return;
-    }
-
-    String[] args = { "kill", "-9", "-"+pgrpId };
-    ShellCommandExecutor shexec = new ShellCommandExecutor(args);
-    try {
-      shexec.execute();
-    } catch (IOException e) {
-      LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+ 
-          StringUtils.stringifyException(e));
-    } finally {
-      LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
-          + shexec.getExitCode());
-    }
+  /**
+   * Sends SIGQUIT to all processes belonging to the same process group,
+   * ordering all processes in the group to send their stack dump to
+   * stdout.
+   *
+   * @param pgrpId process group id
+   */
+  public static void sigQuitProcessGroup(String pgrpId) {
+    maybeSignalProcessGroup(pgrpId, SIGQUIT, SIGQUIT_STR, false);
   }
-  
+
   /**
    * Is the process with PID pid still alive?
    * This method assumes that isAlive is called on a pid that was alive not

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
Fri Dec  4 05:06:34 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 import junit.framework.TestCase;
 
@@ -48,7 +49,10 @@
  * <li>Make the built binary to setuid executable</li>
  * <li>Execute following targets:
  * <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built
binary</em> 
- * -Dtaskcontroller-ugi=<em>user,group</em></code></li>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code>
+ *  <br/>(Note that "path to built binary" means the directory containing task-controller
-
+ *  not the actual complete path of the binary itself. This path must end in ".../bin")
+ *  </li>
  * </ol>
  * 
  */
@@ -72,6 +76,24 @@
     void setTaskControllerExe(String execPath) {
       this.taskControllerExePath = execPath;
     }
+
+    volatile static int attemptedSigQuits = 0;
+    volatile static int failedSigQuits = 0;
+
+    /** Work like LinuxTaskController, but also count the number of
+      * attempted and failed SIGQUIT sends via the task-controller
+      * executable.
+      */
+    @Override
+    void dumpTaskStack(TaskControllerContext context) {
+      attemptedSigQuits++;
+      try {
+        signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+      } catch (Exception e) {
+        LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
+        failedSigQuits++;
+      }
+    }
   }
 
   // cluster instances which sub classes can use

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
Fri Dec  4 05:06:34 2009
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -92,4 +93,30 @@
       }
     }
   }
+
+  /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
+   * if a task times out.
+   */
+  public void testTimeoutStackTrace() throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+
+    // Run a job that should timeout and trigger a SIGQUIT.
+    startCluster();
+    JobConf conf = getClusterConf();
+    conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+    conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+    SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(conf);
+    Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
+    job.setMaxMapAttempts(1);
+    int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
+    job.waitForCompletion(true);
+    assertTrue("Did not detect a new SIGQUIT!",
+        prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
+    assertEquals("A SIGQUIT attempt failed!", 0,
+        MyLinuxTaskController.failedSigQuits);
+
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
Fri Dec  4 05:06:34 2009
@@ -18,12 +18,20 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.BufferedReader;
+import java.io.FileInputStream;
 import java.io.File;
+import java.io.InputStreamReader;
 import java.io.IOException;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.SleepJob;
 
 /**
  * A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -31,38 +39,96 @@
  */
 public class TestJobKillAndFail extends TestCase {
 
+  static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
+
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  public void testJobFailAndKill() throws IOException {
+  /**
+   * TaskController instance that just sets a flag when a stack dump
+   * is performed in a child thread.
+   */
+  static class MockStackDumpTaskController extends DefaultTaskController {
+
+    static volatile int numStackDumps = 0;
+
+    static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
+
+    public MockStackDumpTaskController() {
+      LOG.info("Instantiated MockStackDumpTC");
+    }
+
+    @Override
+    void dumpTaskStack(TaskControllerContext context) {
+      LOG.info("Got stack-dump request in TaskController");
+      MockStackDumpTaskController.numStackDumps++;
+      super.dumpTaskStack(context);
+    }
+
+  }
+
+  /** If a task was killed, then dumpTaskStack() should have been
+    * called. Test whether or not the counter was incremented
+    * and succeed/fail based on this. */
+  private void checkForStackDump(boolean expectDump, int lastNumDumps) {
+    int curNumDumps = MockStackDumpTaskController.numStackDumps;
+
+    LOG.info("curNumDumps=" + curNumDumps + "; lastNumDumps=" + lastNumDumps
+        + "; expect=" + expectDump);
+
+    if (expectDump) {
+      assertTrue("No stack dump recorded!", lastNumDumps < curNumDumps);
+    } else {
+      assertTrue("Stack dump happened anyway!", lastNumDumps == curNumDumps);
+    }
+  }
+
+  public void testJobFailAndKill() throws Exception {
     MiniMRCluster mr = null;
     try {
       JobConf jtConf = new JobConf();
       jtConf.set("mapred.jobtracker.instrumentation", 
           JTInstrumentation.class.getName());
+      jtConf.set("mapreduce.tasktracker.taskcontroller",
+          MockStackDumpTaskController.class.getName());
       mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
       JTInstrumentation instr = (JTInstrumentation) 
         mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
+      conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
       
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
-      RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
+      RunningJob runningJob = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
-      assertEquals(job.getJobState(), JobStatus.FAILED);
+      assertEquals(runningJob.getJobState(), JobStatus.FAILED);
       assertTrue(instr.verifyJob());
       assertEquals(1, instr.failed);
       instr.reset();
 
-      
-      job = UtilsForTests.runJobKill(conf, inDir, outDir);
+      int prevNumDumps = MockStackDumpTaskController.numStackDumps;
+      runningJob = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
-      assertTrue(job.isComplete());
-      assertEquals(job.getJobState(), JobStatus.KILLED);
+      assertTrue(runningJob.isComplete());
+      assertEquals(runningJob.getJobState(), JobStatus.KILLED);
       assertTrue(instr.verifyJob());
       assertEquals(1, instr.killed);
+      // check that job kill does not put a stacktrace in task logs.
+      checkForStackDump(false, prevNumDumps);
+
+      // Test that a task that times out does have a stack trace
+      conf = mr.createJobConf();
+      conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+      conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+      SleepJob sleepJob = new SleepJob();
+      sleepJob.setConf(conf);
+      Job job = sleepJob.createJob(1, 0, 30000, 1,0, 0);
+      job.setMaxMapAttempts(1);
+      prevNumDumps = MockStackDumpTaskController.numStackDumps;
+      job.waitForCompletion(true);
+      checkForStackDump(true, prevNumDumps);
     } finally {
       if (mr != null) {
         mr.shutdown();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Fri
Dec  4 05:06:34 2009
@@ -607,6 +607,7 @@
     conf.setJobName("test-job-fail");
     conf.setMapperClass(FailMapper.class);
     conf.setReducerClass(IdentityReducer.class);
+    conf.setMaxMapAttempts(1);
     
     RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
     while (!job.isComplete()) {



Mime
View raw message