hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r921230 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Wed, 10 Mar 2010 05:26:51 GMT
Author: yhemanth
Date: Wed Mar 10 05:26:50 2010
New Revision: 921230

URL: http://svn.apache.org/viewvc?rev=921230&view=rev
Log:
MAPREDUCE-1422. Fix cleanup of localized job directory to work if files with non-deletable
permissions are created within it. Contributed by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/main.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Mar 10 05:26:50 2010
@@ -436,6 +436,10 @@ Trunk (unreleased changes)
     MAPREDUCE-1578. Decouple HadoopArchives vesrion from HarFileSystem version.
     (Rodrigo Schmidt via szetszwo)
 
+    MAPREDUCE-1422. Fix cleanup of localized job directory to work if files
+    with non-deletable permissions are created within it.
+    (Amar Kamat via yhemanth)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Wed Mar 10 05:26:50 2010
@@ -247,6 +247,11 @@ int main(int argc, char **argv) {
     exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id,
                                         dir_to_be_deleted);
     break;
+  case ENABLE_JOB_FOR_CLEANUP:
+    tt_root = argv[optind++];
+    job_id = argv[optind++];
+    exit_code = enable_job_for_cleanup(tt_root, user_detail->pw_name, job_id);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Wed Mar 10 05:26:50 2010
@@ -1165,25 +1165,24 @@ int kill_user_task(const char *user, con
  * Before changing permissions, makes sure that the given path doesn't contain
  * any relative components.
  * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
- * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ * full_path : is either jobLocalDir, taskDir OR taskWorkDir that is to be 
+ *             deleted
  */
-int enable_task_for_cleanup(const char *tt_root, const char *user,
-           const char *jobid, const char *dir_to_be_deleted) {
+static int enable_path_for_cleanup(const char *tt_root, const char *user,
+                                   char *full_path) {
   int exit_code = 0;
   gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
 
-  char * full_path = NULL;
   if (check_tt_root(tt_root) < 0) {
     fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
     cleanup();
     return INVALID_TT_ROOT;
   }
  
-  full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
   if (full_path == NULL) {
     fprintf(LOGFILE,
             "Could not build the full path. Not deleting the dir %s\n",
-            dir_to_be_deleted);
+            full_path);
     exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
   }
      // Make sure that the path given is not having any relative components
@@ -1213,3 +1212,26 @@ int enable_task_for_cleanup(const char *
   cleanup();
   return exit_code;
 }
+
+/**
+ * Enables the task work-dir/local-dir path for deletion.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ */
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+           const char *jobid, const char *dir_to_be_deleted) {
+  char *full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
+  return enable_path_for_cleanup(tt_root, user, full_path);
+}
+
+/**
+ * Enables the jobLocalDir for deletion.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * user    : owner of the job
+ * jobid   : id of the job for which the cleanup is needed.
+ */
+int enable_job_for_cleanup(const char *tt_root, const char *user, 
+                           const char *jobid) {
+  char *full_path = get_job_directory(tt_root, user, jobid);
+  return enable_path_for_cleanup(tt_root, user, full_path);
+}

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Wed Mar 10 05:26:50 2010
@@ -46,7 +46,8 @@ enum command {
   KILL_TASK_JVM,
   RUN_DEBUG_SCRIPT,
   SIGQUIT_TASK_JVM,
-  ENABLE_TASK_FOR_CLEANUP
+  ENABLE_TASK_FOR_CLEANUP,
+  ENABLE_JOB_FOR_CLEANUP
 };
 
 enum errorcodes {
@@ -130,6 +131,9 @@ int kill_user_task(const char *user, con
 int enable_task_for_cleanup(const char *tt_root, const char *user,
                             const char *jobid, const char *dir_to_be_deleted);
 
+int enable_job_for_cleanup(const char *tt_root, const char *user,
+                           const char *jobid);
+
 int prepare_attempt_directory(const char *attempt_dir, const char *user);
 
 // The following functions are exposed for testing

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Wed
Mar 10 05:26:50 2010
@@ -198,6 +198,25 @@ public class DefaultTaskController exten
   @Override
   void enableTaskForCleanup(PathDeletionContext context)
          throws IOException {
+    enablePathForCleanup(context);
+  }
+  
+  /**
+   * Enables the job for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableJobForCleanup(PathDeletionContext context)
+         throws IOException {
+    enablePathForCleanup(context);
+  }
+  
+  /**
+   * Enables the path for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  private void enablePathForCleanup(PathDeletionContext context)
+         throws IOException {
     try {
       FileUtil.chmod(context.fullPath, "u+rwx", true);
     } catch(InterruptedException e) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java Wed Mar 10 05:26:50
2010
@@ -161,7 +161,7 @@ class JvmManager {
    */
   static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
     tracker.getCleanupThread().addToQueue(
-        TaskTracker.buildTaskControllerPathDeletionContexts(
+        TaskTracker.buildTaskControllerTaskPathDeletionContexts(
           tracker.getLocalFileSystem(),
           tracker.getLocalFiles(tracker.getJobConf(), ""),
           task, true /* workDir */,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Wed
Mar 10 05:26:50 2010
@@ -29,8 +29,10 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
@@ -49,7 +51,7 @@ import org.apache.hadoop.util.Shell.Shel
  * <p>task-controller mapreduce.job.user.name command command-args, where</p>
  * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
  * <p>command is one of the cardinal value of the 
- * {@link LinuxTaskController.TaskCommands} enumeration</p>
+ * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
  * <p>command-args depends on the command being launched.</p>
  * 
  * In addition to running and killing tasks, the class also 
@@ -83,7 +85,7 @@ class LinuxTaskController extends TaskCo
   /**
    * List of commands that the setuid script will execute.
    */
-  enum TaskCommands {
+  enum TaskControllerCommands {
     INITIALIZE_USER,
     INITIALIZE_JOB,
     INITIALIZE_DISTRIBUTEDCACHE_FILE,
@@ -93,7 +95,8 @@ class LinuxTaskController extends TaskCo
     KILL_TASK_JVM,
     RUN_DEBUG_SCRIPT,
     SIGQUIT_TASK_JVM,
-    ENABLE_TASK_FOR_CLEANUP
+    ENABLE_TASK_FOR_CLEANUP,
+    ENABLE_JOB_FOR_CLEANUP
   }
 
   @Override
@@ -156,7 +159,7 @@ class LinuxTaskController extends TaskCo
     List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, 
         context.env.workDir);
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
-                                    TaskCommands.LAUNCH_TASK_JVM, 
+                                    TaskControllerCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
                                     launchTaskJVMArgs, env.workDir, env.env);
     context.shExec = shExec;
@@ -195,47 +198,48 @@ class LinuxTaskController extends TaskCo
     writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
     // Call the taskcontroller with the right parameters.
     List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
-    runCommand(TaskCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
+    runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
         launchTaskJVMArgs, context.workDir, null);
   }
   /**
    * Helper method that runs a LinuxTaskController command
    * 
-   * @param taskCommand
+   * @param taskControllerCommand
    * @param user
    * @param cmdArgs
    * @param env
    * @throws IOException
    */
-  private void runCommand(TaskCommands taskCommand, String user,
-      List<String> cmdArgs, File workDir, Map<String, String> env)
+  private void runCommand(TaskControllerCommands taskControllerCommand, 
+      String user, List<String> cmdArgs, File workDir, Map<String, String> env)
       throws IOException {
 
     ShellCommandExecutor shExec =
-        buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+        buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, 
+                                    workDir, env);
     try {
       shExec.execute();
     } catch (Exception e) {
-      LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+      LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
           + shExec.getExitCode());
-      LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+      LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
           + StringUtils.stringifyException(e));
-      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
-          + " follows:");
+      LOG.info("Output from LinuxTaskController's " 
+               + taskControllerCommand.toString() + " follows:");
       logOutput(shExec.getOutput());
       throw new IOException(e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
-          + " follows:");
+      LOG.info("Output from LinuxTaskController's " 
+               + taskControllerCommand.toString() + " follows:");
       logOutput(shExec.getOutput());
     }
   }
 
   /**
    * Returns list of arguments to be passed while initializing a new task. See
-   * {@code buildTaskControllerExecutor(TaskCommands, String, List<String>,
-   * JvmEnv)} documentation.
+   * {@code buildTaskControllerExecutor(TaskControllerCommands, String, 
+   * List<String>, JvmEnv)} documentation.
    * 
    * @param context
    * @return Argument to be used while launching Task VM
@@ -257,10 +261,12 @@ class LinuxTaskController extends TaskCo
   void initializeTask(TaskControllerContext context)
       throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+      LOG.debug("Going to do " 
+                + TaskControllerCommands.INITIALIZE_TASK.toString()
                 + " for " + context.task.getTaskID().toString());
     }
-    runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+    runCommand(TaskControllerCommands.INITIALIZE_TASK, 
+        context.env.conf.getUser(),
         buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
   }
 
@@ -269,7 +275,7 @@ class LinuxTaskController extends TaskCo
    * cleanup. Last arg in this List is either $attemptId or $attemptId/work
    */
   private List<String> buildTaskCleanupArgs(
-      TaskControllerPathDeletionContext context) {
+      TaskControllerTaskPathDeletionContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     commandArgs.add(context.mapredLocalDir.toUri().getPath());
     commandArgs.add(context.task.getJobID().toString());
@@ -289,39 +295,82 @@ class LinuxTaskController extends TaskCo
   }
 
   /**
+   * Builds the args to be passed to task-controller for enabling of job for
+   * cleanup. Last arg in this List is $jobid.
+   */
+  private List<String> buildJobCleanupArgs(
+      TaskControllerJobPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(2);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.jobId.toString());
+
+    return commandArgs;
+  }
+  
+  /**
    * Enables the task for cleanup by changing permissions of the specified path
    * in the local filesystem
    */
   @Override
   void enableTaskForCleanup(PathDeletionContext context)
       throws IOException {
+    if (context instanceof TaskControllerTaskPathDeletionContext) {
+      TaskControllerTaskPathDeletionContext tContext =
+        (TaskControllerTaskPathDeletionContext) context;
+      enablePathForCleanup(tContext, 
+                           TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
+                           buildTaskCleanupArgs(tContext));
+    }
+    else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+          + "TaskControllerTaskPathDeletionContext.");
+    }
+  }
+
+  /**
+   * Enables the job for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableJobForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (context instanceof TaskControllerJobPathDeletionContext) {
+      TaskControllerJobPathDeletionContext tContext =
+        (TaskControllerJobPathDeletionContext) context;
+      enablePathForCleanup(tContext, 
+                           TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
+                           buildJobCleanupArgs(tContext));
+    } else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+                  + "TaskControllerJobPathDeletionContext.");
+    }
+  }
+  
+  /**
+   * Enable a path for cleanup
+   * @param c {@link TaskControllerPathDeletionContext} for the path to be 
+   *          cleaned up
+   * @param command {@link TaskControllerCommands} for task/job cleanup
+   * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable 
+   *                    path cleanup
+   */
+  private void enablePathForCleanup(TaskControllerPathDeletionContext c,
+                                    TaskControllerCommands command,
+                                    List<String> cleanupArgs) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
-                + " for " + context.fullPath);
+      LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
     }
 
-    if (context instanceof TaskControllerPathDeletionContext) {
-      TaskControllerPathDeletionContext tContext =
-        (TaskControllerPathDeletionContext) context;
-    
-      if (tContext.task.getUser() != null &&
-          tContext.fs instanceof LocalFileSystem) {
-        try {
-          runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
-                   tContext.task.getUser(),
-                   buildTaskCleanupArgs(tContext), null, null);
-        } catch(IOException e) {
-          LOG.warn("Uanble to change permissions for " + tContext.fullPath);
-        }
-      }
-      else {
-        throw new IllegalArgumentException("Either user is null or the "  +
-                               "file system is not local file system.");
+    if ( c.user != null && c.fs instanceof LocalFileSystem) {
+      try {
+        runCommand(command, c.user, cleanupArgs, null, null);
+      } catch(IOException e) {
+        LOG.warn("Unable to change permissions for " + c.fullPath);
       }
     }
     else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-          + "TaskControllerPathDeletionContext.");
+      throw new IllegalArgumentException("Either user is null or the " 
+                  + "file system is not local file system.");
     }
   }
 
@@ -343,7 +392,7 @@ class LinuxTaskController extends TaskCo
 
   /**
    * Returns list of arguments to be passed while launching task VM.
-   * See {@code buildTaskControllerExecutor(TaskCommands, 
+   * See {@code buildTaskControllerExecutor(TaskControllerCommands, 
    * String, List<String>, JvmEnv)} documentation.
    * @param context
    * @return Argument to be used while launching Task VM
@@ -406,7 +455,7 @@ class LinuxTaskController extends TaskCo
    * @throws IOException
    */
   private ShellCommandExecutor buildTaskControllerExecutor(
-      TaskCommands command, String userName, List<String> cmdArgs,
+      TaskControllerCommands command, String userName, List<String> cmdArgs,
       File workDir, Map<String, String> env)
       throws IOException {
     String[] taskControllerCmd = new String[3 + cmdArgs.size()];
@@ -494,7 +543,7 @@ class LinuxTaskController extends TaskCo
       throws IOException {
     LOG.debug("Going to initialize job " + context.jobid.toString()
         + " on the TT");
-    runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+    runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
         buildInitializeJobCommandArgs(context), context.workDir, null);
   }
 
@@ -512,8 +561,8 @@ class LinuxTaskController extends TaskCo
     args.add("--");
     args.add(context.localizedBaseDir.toString());
     args.add(context.uniqueString);
-    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
-        args, context.workDir, null);
+    runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, 
+        context.user, args, context.workDir, null);
   }
 
   @Override
@@ -521,14 +570,14 @@ class LinuxTaskController extends TaskCo
       throws IOException {
     LOG.debug("Going to initialize user directories for " + context.user
         + " on the TT");
-    runCommand(TaskCommands.INITIALIZE_USER, context.user,
+    runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
         new ArrayList<String>(), context.workDir, null);
   }
 
   /**
    * API which builds the command line to be pass to LinuxTaskController
    * binary to terminate/kill the task. See 
-   * {@code buildTaskControllerExecutor(TaskCommands, 
+   * {@code buildTaskControllerExecutor(TaskControllerCommands, 
    * String, List<String>, JvmEnv)} documentation.
    * 
    * 
@@ -550,7 +599,7 @@ class LinuxTaskController extends TaskCo
    * @throws IOException
    */
   protected void signalTask(TaskControllerContext context,
-      TaskCommands command) throws IOException{
+      TaskControllerCommands command) throws IOException{
     if(context.task == null) {
       LOG.info("Context task is null; not signaling the JVM");
       return;
@@ -570,7 +619,7 @@ class LinuxTaskController extends TaskCo
   @Override
   void terminateTask(TaskControllerContext context) {
     try {
-      signalTask(context, TaskCommands.TERMINATE_TASK_JVM);
+      signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending kill to the Task VM " + 
           StringUtils.stringifyException(e));
@@ -580,7 +629,7 @@ class LinuxTaskController extends TaskCo
   @Override
   void killTask(TaskControllerContext context) {
     try {
-      signalTask(context, TaskCommands.KILL_TASK_JVM);
+      signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending destroy to the Task VM " + 
           StringUtils.stringifyException(e));
@@ -590,7 +639,7 @@ class LinuxTaskController extends TaskCo
   @Override
   void dumpTaskStack(TaskControllerContext context) {
     try {
-      signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+      signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
     } catch (Exception e) {
       LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
           StringUtils.stringifyException(e));

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Wed Mar 10
05:26:50 2010
@@ -195,25 +195,26 @@ public abstract class TaskController imp
    * Contains info related to the path of the file/dir to be deleted. This info
    * is needed by task-controller to build the full path of the file/dir
    */
-  static class TaskControllerPathDeletionContext extends PathDeletionContext {
-    Task task;
-    boolean isWorkDir;
+  static abstract class TaskControllerPathDeletionContext 
+  extends PathDeletionContext {
     TaskController taskController;
+    String user;
 
     /**
-     * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
-     * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
-     * is built using mapredLocalDir, jobId, taskId, etc.
+     * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir, 
+     * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir, 
+     * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId, 
+     * taskId, etc.
      */
     Path mapredLocalDir;
 
     public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
-        Task task, boolean isWorkDir, TaskController taskController) {
+                                             TaskController taskController,
+                                             String user) {
       super(fs, null);
-      this.task = task;
-      this.isWorkDir = isWorkDir;
       this.taskController = taskController;
       this.mapredLocalDir = mapredLocalDir;
+      this.user = user;
     }
 
     @Override
@@ -225,18 +226,56 @@ public abstract class TaskController imp
     }
 
     /**
+     * Return the component of the path under the {@link #mapredLocalDir} to be 
+     * cleaned up. Its the responsibility of the class that extends 
+     * {@link TaskControllerPathDeletionContext} to provide the correct 
+     * component. For example 
+     *  - For task related cleanups, either the task-work-dir or task-local-dir
+     *    might be returned depending on jvm reuse.
+     *  - For job related cleanup, simply the job-local-dir might be returned.
+     */
+    abstract protected String getPath();
+    
+    /**
      * Builds the path of taskAttemptDir OR taskWorkDir based on
      * mapredLocalDir, jobId, taskId, etc
      */
     String buildPathForDeletion() {
+      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
+    }
+  }
+
+  /** Contains info related to the path of the file/dir to be deleted. This info
+   * is needed by task-controller to build the full path of the task-work-dir or
+   * task-local-dir depending on whether the jvm is reused or not.
+   */
+  static class TaskControllerTaskPathDeletionContext 
+  extends TaskControllerPathDeletionContext {
+    final Task task;
+    final boolean isWorkDir;
+    
+    public TaskControllerTaskPathDeletionContext(FileSystem fs, 
+        Path mapredLocalDir, Task task, boolean isWorkDir, 
+        TaskController taskController) {
+      super(fs, mapredLocalDir, taskController, task.getUser());
+      this.task = task;
+      this.isWorkDir = isWorkDir;
+    }
+    
+    /**
+     * Returns the taskWorkDir or taskLocalDir based on whether 
+     * {@link TaskControllerTaskPathDeletionContext} is configured to delete
+     * the workDir.
+     */
+    @Override
+    protected String getPath() {
       String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
           task.getJobID().toString(), task.getTaskID().toString(),
           task.isTaskCleanupTask())
         : TaskTracker.getLocalTaskDir(task.getUser(),
           task.getJobID().toString(), task.getTaskID().toString(),
           task.isTaskCleanupTask());
-
-      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+      return subDir;
     }
 
     /**
@@ -252,6 +291,41 @@ public abstract class TaskController imp
     }
   }
 
+  /** Contains info related to the path of the file/dir to be deleted. This info
+   * is needed by task-controller to build the full path of the job-local-dir.
+   */
+  static class TaskControllerJobPathDeletionContext 
+  extends TaskControllerPathDeletionContext {
+    final JobID jobId;
+    
+    public TaskControllerJobPathDeletionContext(FileSystem fs, 
+        Path mapredLocalDir, JobID id, String user, 
+        TaskController taskController) {
+      super(fs, mapredLocalDir, taskController, user);
+      this.jobId = id;
+    }
+    
+    /**
+     * Returns the jobLocalDir of the job to be cleaned up.
+     */
+    @Override
+    protected String getPath() {
+      return TaskTracker.getLocalJobDir(user, jobId.toString());
+    }
+    
+    /**
+     * Makes the path(and its sub-directories recursively) fully deletable by
+     * setting proper permissions(770) by task-controller
+     */
+    @Override
+    protected void enablePathForCleanup() throws IOException {
+      getPathForCleanup();// allow init of fullPath, if not inited already
+      if (fs.exists(new Path(fullPath))) {
+        taskController.enableJobForCleanup(this);
+      }
+    }
+  }
+  
   /**
    * NOTE: This class is internal only class and not intended for users!!
    * 
@@ -351,4 +425,12 @@ public abstract class TaskController imp
    */
   abstract void enableTaskForCleanup(PathDeletionContext context)
       throws IOException;
+  
+  /**
+   * Enable the job for cleanup by changing permissions of the path
+   * @param context   path deletion context
+   * @throws IOException
+   */
+  abstract void enableJobForCleanup(PathDeletionContext context)
+    throws IOException;
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Mar 10 05:26:50
2010
@@ -70,6 +70,8 @@ import org.apache.hadoop.mapred.TaskCont
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -1788,7 +1790,33 @@ public class TaskTracker 
   }
 
   /**
-   * Builds list of TaskControllerPathDeletionContext objects for a task
+   * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a 
+   * job each pointing to the job's jobLocalDir.
+   * @param fs    : FileSystem in which the dirs to be deleted
+   * @param paths : mapred-local-dirs
+   * @param id    : {@link JobID} of the job for which the local-dir needs to 
+   *                be cleaned up.
+   * @param user  : Job owner's username
+   * @param taskController : the task-controller to be used for deletion of
+   *                         jobLocalDir
+   */
+  static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
+      FileSystem fs, Path[] paths, JobID id, String user,
+      TaskController taskController)
+      throws IOException {
+    int i = 0;
+    PathDeletionContext[] contexts =
+                          new TaskControllerPathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
+                                                               taskController);
+    }
+    return contexts;
+  } 
+  
+  /**
+   * Builds list of TaskControllerTaskPathDeletionContext objects for a task
    * @param fs    : FileSystem in which the dirs to be deleted
    * @param paths : mapred-local-dirs
    * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
@@ -1796,7 +1824,7 @@ public class TaskTracker 
    * @param taskController : the task-controller to be used for deletion of
    *                         taskDir or taskWorkDir
    */
-  static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+  static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
       FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
       TaskController taskController)
       throws IOException {
@@ -1805,7 +1833,7 @@ public class TaskTracker 
                           new TaskControllerPathDeletionContext[paths.length];
 
     for (Path p : paths) {
-      contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+      contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
                           isWorkDir, taskController);
     }
     return contexts;
@@ -1855,7 +1883,7 @@ public class TaskTracker 
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles) {
-          removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
+          removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
         }
         // add job to taskLogCleanupThread
         long now = System.currentTimeMillis();
@@ -1879,10 +1907,11 @@ public class TaskTracker 
    * @param rjob
    * @throws IOException
    */
-  void removeJobFiles(String user, String jobId)
+  void removeJobFiles(String user, JobID jobId)
       throws IOException {
-    PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
-        getLocalFiles(fConf, getLocalJobDir(user, jobId)));
+    PathDeletionContext[] contexts = 
+      buildTaskControllerJobPathDeletionContexts(localFs, 
+          getLocalFiles(fConf, ""), jobId, user, taskController);
     directoryCleanupThread.addToQueue(contexts);
   }
 
@@ -3006,7 +3035,7 @@ public class TaskTracker 
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
           // No jvm reuse, remove everything
           PathDeletionContext[] contexts =
-            buildTaskControllerPathDeletionContexts(localFs,
+            buildTaskControllerTaskPathDeletionContexts(localFs,
                 getLocalFiles(fConf, ""), task, false/* not workDir */,
                 taskController);
           directoryCleanupThread.addToQueue(contexts);
@@ -3026,7 +3055,7 @@ public class TaskTracker 
       } else {
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
           PathDeletionContext[] contexts =
-            buildTaskControllerPathDeletionContexts(localFs,
+            buildTaskControllerTaskPathDeletionContexts(localFs,
               getLocalFiles(fConf, ""), task, true /* workDir */,
               taskController);
           directoryCleanupThread.addToQueue(contexts);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
Wed Mar 10 05:26:50 2010
@@ -109,7 +109,7 @@ public class ClusterWithLinuxTaskControl
     void dumpTaskStack(TaskControllerContext context) {
       attemptedSigQuits++;
       try {
-        signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+        signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
       } catch (Exception e) {
         LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
         failedSigQuits++;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=921230&r1=921229&r2=921230&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Wed Mar 10 05:26:50 2010
@@ -670,6 +670,7 @@ public class TestTaskTrackerLocalization
           taskId.toString(), task.isTaskCleanupTask());
 
     Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
+    assertTrue("No paths found", paths.length > 0);
     for (Path p : paths) {
       if (tracker.getLocalFileSystem().exists(p)) {
         createFileAndSetPermissions(localizedJobConf, p);
@@ -696,7 +697,7 @@ public class TestTaskTrackerLocalization
       // now try to delete the work dir and verify that there are no stale paths
       JvmManager.deleteWorkDir(tracker, task);
     }
-    tracker.removeJobFiles(task.getUser(), jobId.toString());
+    tracker.removeJobFiles(task.getUser(), jobId);
 
     assertTrue("Some task files are not deleted!! Number of stale paths is "
         + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
@@ -815,4 +816,78 @@ public class TestTaskTrackerLocalization
     assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
         jobUserLogDir.exists());
   }
+  
+  /**
+   * Test job cleanup by doing the following
+   *   - create files with no write permissions to TT under job-work-dir
+   *   - create files with no write permissions to TT under task-work-dir
+   */
+  public void testJobCleanup() throws IOException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+    
+    LOG.info("Running testJobCleanup()");
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    
+    // Set an inline cleanup queue
+    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+    tracker.setCleanupThread(cleanupQueue);
+    
+    // Create a file in job's work-dir with 555
+    String jobWorkDir = 
+      TaskTracker.getJobWorkDir(task.getUser(), task.getJobID().toString());
+    Path[] jPaths = tracker.getLocalFiles(localizedJobConf, jobWorkDir);
+    assertTrue("No paths found for job", jPaths.length > 0);
+    for (Path p : jPaths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        createFileAndSetPermissions(localizedJobConf, p);
+      }
+    }
+    
+    // Initialize task dirs
+    tip.setJobConf(localizedJobConf);
+    tip.localizeTask(task);
+    
+    // Create a file in task local dir with 555
+    // this is to simply test the case where the jvm reuse is enabled and some
+    // files in task-attempt-local-dir are left behind to be cleaned up when the
+    // job finishes.
+    String taskLocalDir = 
+      TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(), 
+                                  task.getTaskID().toString(), false);
+    Path[] tPaths = tracker.getLocalFiles(localizedJobConf, taskLocalDir);
+    assertTrue("No paths found for task", tPaths.length > 0);
+    for (Path p : tPaths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        createFileAndSetPermissions(localizedJobConf, p);
+      }
+    }
+    
+    // remove the job work dir
+    tracker.removeJobFiles(task.getUser(), task.getJobID());
+
+    // check the task-local-dir
+    boolean tLocalDirExists = false;
+    for (Path p : tPaths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        tLocalDirExists = true;
+      }
+    }
+    assertFalse("Task " + task.getTaskID() + " local dir exists after cleanup", 
+                tLocalDirExists);
+    
+    // Verify that the TaskTracker (via the task-controller) cleans up the dirs.
+    // check the job-work-dir
+    boolean jWorkDirExists = false;
+    for (Path p : jPaths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        jWorkDirExists = true;
+      }
+    }
+    assertFalse("Job " + task.getJobID() + " work dir exists after cleanup", 
+                jWorkDirExists);
+  }
 }



Mime
View raw message