hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077069 - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 03:37:23 GMT
Author: omalley
Date: Fri Mar  4 03:37:22 2011
New Revision: 1077069

URL: http://svn.apache.org/viewvc?rev=1077069&view=rev
Log:
commit 48c0528cac774ce2a33c150ba1ccc8f07c5b4189
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date:   Wed Dec 9 10:48:35 2009 +0530

    MAPREDUCE:896 from https://issues.apache.org/jira/secure/attachment/12427328/y896.v2.1.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-896. Modify permissions for local files on tasktracker before
    +    deletion so they can be deleted cleanly. (Ravi Gummadi via yhemanth)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar  4 03:37:22 2011
@@ -23,6 +23,7 @@ int main(int argc, char **argv) {
   int next_option = 0;
   const char * job_id = NULL;
   const char * task_id = NULL;
+  const char * dir_to_be_deleted = NULL;
   const char * tt_root = NULL;
   int exit_code = 0;
   const char * task_pid = NULL;
@@ -31,6 +32,7 @@ int main(int argc, char **argv) {
       NULL, 0 } };
 
   const char* log_file = NULL;
+  char * base_path = NULL;
 
   //Minimum number of arguments required to run the task-controller
   //command-name user command tt-root
@@ -108,6 +110,13 @@ int main(int argc, char **argv) {
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
     break;
+  case ENABLE_TASK_FOR_CLEANUP:
+    base_path = argv[optind++];
+    job_id = argv[optind++];
+    dir_to_be_deleted = argv[optind++];
+    exit_code = enable_task_for_cleanup(base_path, user_detail->pw_name, job_id,
+                                        dir_to_be_deleted);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 03:37:22 2011
@@ -104,12 +104,17 @@ int check_tt_root(const char *tt_root) {
  * path resolve to one and same.
  */
 
-int check_path(char *path) {
+int check_path_for_relative_components(char *path) {
   char * resolved_path = (char *) canonicalize_file_name(path);
   if(resolved_path == NULL) {
+    fprintf(LOGFILE, "Error resolving the path: %s. Passed path: %s\n",
+          strerror(errno), path);
     return ERROR_RESOLVING_FILE_PATH;
   }
   if(strcmp(resolved_path, path) !=0) {
+    fprintf(LOGFILE,
+            "Relative path components in the path: %s. Resolved path: %s\n",
+            path, resolved_path);
     free(resolved_path);
     return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
   }
@@ -164,6 +169,34 @@ void get_task_file_path(const char * job
   free(mapred_local_dir);
 }
 
+/*
+ * Builds the full path of the dir(localTaskDir or localWorkDir)
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work)
+ *
+ * Check TT_LOCAL_TASK_DIR_PATTERN for pattern
+ */
+void get_task_dir_path(const char * tt_root, const char * jobid,
+                       const char * dir_to_be_deleted, char **task_dir_path) {
+  *task_dir_path = NULL;
+  int str_len = strlen(TT_LOCAL_TASK_DIR_PATTERN) + strlen(jobid) + strlen(
+      dir_to_be_deleted) + strlen(tt_root);
+
+  *task_dir_path = (char *) malloc(sizeof(char) * (str_len + 1));
+  if (*task_dir_path == NULL) {
+    fprintf(LOGFILE, "Unable to allocate memory for task_dir_path \n");
+    return;
+  }
+
+  memset(*task_dir_path,'\0',str_len+1);
+  snprintf(*task_dir_path, str_len, TT_LOCAL_TASK_DIR_PATTERN, tt_root,
+           jobid, dir_to_be_deleted);
+#ifdef DEBUG
+  fprintf(LOGFILE, "get_task_dir_path : task dir path = %s\n", *task_dir_path);
+  fflush(LOGFILE);
+#endif
+}
+
 //end of private functions
 void display_usage(FILE *stream) {
   fprintf(stream,
@@ -183,6 +216,200 @@ int get_user_details(const char *user) {
   return 0;
 }
 
+/**
+ * Compare ownership of a file with the given ids.
+ */
+int compare_ownership(uid_t uid, gid_t gid, char *path) {
+  struct stat filestat;
+  if (stat(path, &filestat) != 0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  if (uid == filestat.st_uid && gid == filestat.st_gid) {
+    return 0;
+  }
+  return 1;
+}
+
+/*
+ * Function to check if the TaskTracker actually owns the file.
+  */
+int check_ownership(char *path) {
+  struct stat filestat;
+  if (stat(path, &filestat) != 0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  // check user/group. User should be TaskTracker user, group can either be
+  // TaskTracker's primary group or the special group to which binary's
+  // permissions are set.
+  if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+      != filestat.st_gid)) {
+    return FILE_NOT_OWNED_BY_TASKTRACKER;
+  }
+  return 0;
+}
+
+/**
+ * Function to change the owner/group of a given path.
+ */
+static int change_owner(const char *path, uid_t uid, gid_t gid) {
+  int exit_code = chown(path, uid, gid);
+  if (exit_code != 0) {
+    fprintf(LOGFILE, "chown %d:%d for path %s failed: %s.\n", uid, gid, path,
+        strerror(errno));
+  }
+  return exit_code;
+}
+
+/**
+ * Function to change the mode of a given path.
+ */
+static int change_mode(const char *path, mode_t mode) {
+  int exit_code = chmod(path, mode);
+  if (exit_code != 0) {
+    fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path,
+        strerror(errno));
+  }
+  return exit_code;
+}
+
+/**
+ * Function to change permissions of the given path. It does the following
+ * recursively:
+ *    1) changes the owner/group of the paths to the passed owner/group
+ *    2) changes the file permission to the passed file_mode and directory
+ *       permission to the passed dir_mode
+ *
+ * should_check_ownership : boolean to enable checking of ownership of each path
+ */
+static int secure_path(const char *path, uid_t uid, gid_t gid,
+    mode_t file_mode, mode_t dir_mode, int should_check_ownership) {
+  FTS *tree = NULL; // the file hierarchy
+  FTSENT *entry = NULL; // a file in the hierarchy
+  char *paths[] = { (char *) path };
+  int process_path = 0;
+  int dir = 0;
+  int error_code = 0;
+  int done = 0;
+
+  // Get physical locations and don't resolve the symlinks.
+  // Don't change directory while walking the directory.
+  int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
+
+  tree = fts_open(paths, ftsoptions, NULL);
+  if (tree == NULL) {
+    fprintf(LOGFILE,
+        "Cannot open file traversal structure for the path %s:%s.\n", path,
+        strerror(errno));
+    return -1;
+  }
+
+  while (((entry = fts_read(tree)) != NULL) && !done) {
+    dir = 0;
+    switch (entry->fts_info) {
+    case FTS_D:
+      // A directory being visited in pre-order.
+      // We change ownership of directories in post-order.
+      // so ignore the pre-order visit.
+      process_path = 0;
+      break;
+    case FTS_DC:
+      // A directory that causes a cycle in the tree
+      // We don't expect cycles, ignore.
+      process_path = 0;
+      break;
+    case FTS_DNR:
+      // A directory which cannot be read
+      // Ignore and set error code.
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_DOT:
+      // "."  or ".."
+      process_path = 0;
+      break;
+    case FTS_F:
+      // A regular file
+      process_path = 1;
+      break;
+    case FTS_DP:
+      // A directory being visited in post-order
+      if (entry->fts_level == 0) {
+        // root directory. Done with traversing.
+        done = 1;
+      }
+      process_path = 1;
+      dir = 1;
+      break;
+    case FTS_SL:
+      // A symbolic link
+      process_path = 1;
+      break;
+    case FTS_SLNONE:
+      // A symbolic link with a nonexistent target
+      process_path = 1;
+      break;
+    case FTS_NS:
+      // A  file for which no stat(2) information was available
+      // Ignore and set error code
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_ERR:
+      // An error return. Ignore and set error code.
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_DEFAULT:
+      // File that doesn't belong to any of the above type. Ignore.
+      process_path = 0;
+      break;
+    default:
+      // None of the above. Ignore and set error code
+      process_path = 0;
+      error_code = -1;
+    }
+
+    if (error_code != 0) {
+      break;
+    }
+    if (!process_path) {
+      continue;
+    }
+    if (should_check_ownership &&
+        (compare_ownership(uid, gid, entry->fts_path) == 0)) {
+      // already set proper permissions.
+      // This might happen with distributed cache.
+#ifdef DEBUG
+      fprintf(
+          LOGFILE,
+          "already has private permissions. Not trying to change again for %s",
+          entry->fts_path);
+#endif
+      continue;
+    }
+
+    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
+      fprintf(LOGFILE,
+          "Invalid file path. %s not user/group owned by the tasktracker.\n",
+          entry->fts_path);
+      error_code = -1;
+    } else if (change_owner(entry->fts_path, uid, gid) != 0) {
+      fprintf(LOGFILE, "couldn't change the ownership of %s\n",
+          entry->fts_path);
+      error_code = -3;
+    } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
+      fprintf(LOGFILE, "couldn't change the permissions of %s\n",
+          entry->fts_path);
+      error_code = -3;
+    }
+  }
+  if (fts_close(tree) != 0) {
+    fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
+        strerror(errno));
+  }
+  return error_code;
+}
+
 /*
  *Function used to launch a task as the provided user.
  * First the function checks if the tt_root passed is found in
@@ -231,7 +458,7 @@ int run_task_as_user(const char * user, 
     return INVALID_TASK_SCRIPT_PATH;
   }
   errno = 0;
-  exit_code = check_path(task_script_path);
+  exit_code = check_path_for_relative_components(task_script_path);
   if(exit_code != 0) {
     goto cleanup;
   }
@@ -301,3 +528,59 @@ int kill_user_task(const char *user, con
   return 0;
 }
 
+/**
+ * Enables the path for deletion by changing the owner, group and permissions
+ * of the specified path and all the files/directories in the path recursively.
+ *     *  sudo chown user:mapred -R full_path
+ *     *  sudo chmod 2777 -R full_path
+ * Before changing permissions, makes sure that the given path doesn't contain
+ * any relative components.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ */
+int enable_task_for_cleanup(char *tt_root, const char *user,
+           const char *jobid, const char *dir_to_be_deleted) {
+  int exit_code = 0;
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char * full_path = NULL;
+  if (check_tt_root(tt_root) < 0) {
+    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  get_task_dir_path(tt_root, jobid, dir_to_be_deleted, &full_path);
+  if (full_path == NULL) {
+    fprintf(LOGFILE, 
+      "Could not build the full path. Not deleting the dir %s\n",
+      dir_to_be_deleted);
+    exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+  }
+     // Make sure that the path given is not having any relative components
+  else if ((exit_code = check_path_for_relative_components(full_path)) != 0) {
+    fprintf(LOGFILE, 
+         "Not changing permissions as the path contains relative components.\n",
+         full_path);
+  }
+  else if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+    exit_code = INVALID_USER_NAME;
+  }
+  else if (exit_code = secure_path(full_path, user_detail->pw_uid,
+               tasktracker_gid, S_IRWXU | S_IRWXG | S_IRWXO,
+               S_ISGID | S_IRWXU | S_IRWXG | S_IRWXO, 0) != 0) {
+    // No setgid on files and setgid on dirs, 777.
+    // set 777 permissions for user, TTgroup for all files/directories in
+    // 'full_path' recursively sothat deletion of path by TaskTracker succeeds.
+
+    fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path);
+  }
+
+  if (full_path != NULL) {
+    free(full_path);
+  }
+  // free configurations
+  cleanup();
+  return exit_code;
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar  4 03:37:22 2011
@@ -29,13 +29,16 @@
 #include <sys/signal.h>
 #include <getopt.h>
 #include<grp.h>
+#include <fts.h>
+
 #include "configuration.h"
 
 //command definitions
 enum command {
   LAUNCH_TASK_JVM,
   TERMINATE_TASK_JVM,
-  KILL_TASK_JVM
+  KILL_TASK_JVM,
+  ENABLE_TASK_FOR_CLEANUP
 };
 
 enum errorcodes {
@@ -53,12 +56,15 @@ enum errorcodes {
   ERROR_RESOLVING_FILE_PATH, //12
   RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
   UNABLE_TO_STAT_FILE, //14
-  FILE_NOT_OWNED_BY_TASKTRACKER //15
+  FILE_NOT_OWNED_BY_TASKTRACKER, //15
+  UNABLE_TO_BUILD_PATH //16
 };
 
 
 #define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
 
+#define TT_LOCAL_TASK_DIR_PATTERN    "%s/taskTracker/jobcache/%s/%s"
+
 #define TT_SYS_DIR_KEY "mapred.local.dir"
 
 #define MAX_ITEMS 10
@@ -78,4 +84,7 @@ int run_task_as_user(const char * user, 
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
+int enable_task_for_cleanup(char * base_path, const char *user,
+ const char *jobid, const char *dir_to_be_deleted);
+
 int get_user_details(const char *user);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Mar  4 03:37:22 2011
@@ -39,7 +39,7 @@ class CleanupQueue {
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
+   * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -49,23 +49,53 @@ class CleanupQueue {
       }
     }
   }
-  
-  public void addToQueue(JobConf conf, Path...paths) {
-    cleanupThread.addToQueue(conf,paths);
+
+  /**
+   * Contains info related to the path of the file/dir to be deleted
+   */
+  static class PathDeletionContext {
+    String fullPath;// full path of file or dir
+    FileSystem fs;
+
+    public PathDeletionContext(FileSystem fs, String fullPath) {
+      this.fs = fs;
+      this.fullPath = fullPath;
+    }
+    
+    protected String getPathForCleanup() {
+      return fullPath;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable
+     */
+    protected void enablePathForCleanup() throws IOException {
+      // do nothing
+    }
   }
 
-  private static class PathCleanupThread extends Thread {
+  /**
+   * Adds the paths to the queue of paths to be deleted by cleanupThread.
+   */
+  void addToQueue(PathDeletionContext... contexts) {
+    cleanupThread.addToQueue(contexts);
+  }
 
-    static class PathAndConf {
-      JobConf conf;
-      Path path;
-      PathAndConf(JobConf conf, Path path) {
-        this.conf = conf;
-        this.path = path;
-      }
+  protected static boolean deletePath(PathDeletionContext context)
+            throws IOException {
+    context.enablePathForCleanup();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to delete " + context.fullPath);
     }
+    return context.fs.delete(new Path(context.fullPath), true);
+  }
+
+  private static class PathCleanupThread extends Thread {
+
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
+    private LinkedBlockingQueue<PathDeletionContext> queue =
+      new LinkedBlockingQueue<PathDeletionContext>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -73,28 +103,34 @@ class CleanupQueue {
       start();
     }
 
-    public void addToQueue(JobConf conf,Path... paths) {
-      for (Path p : paths) {
+    void addToQueue(PathDeletionContext[] contexts) {
+      for (PathDeletionContext context : contexts) {
         try {
-          queue.put(new PathAndConf(conf,p));
-        } catch (InterruptedException ie) {}
+          queue.put(context);
+        } catch(InterruptedException ie) {}
       }
     }
 
     public void run() {
-      LOG.debug(getName() + " started.");
-      PathAndConf pathAndConf = null;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + " started.");
+      }
+      PathDeletionContext context = null;
       while (true) {
         try {
-          pathAndConf = queue.take();
+          context = queue.take();
           // delete the path.
-          FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
-          fs.delete(pathAndConf.path, true);
-          LOG.debug("DELETED " + pathAndConf.path);
+          if (!deletePath(context)) {
+            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
+          }
+          else if (LOG.isDebugEnabled()) {
+            LOG.debug("DELETED " + context.fullPath);
+          }
         } catch (InterruptedException t) {
+          LOG.warn("Interrupted deletion of " + context.fullPath);
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path" + pathAndConf.path);
+          LOG.warn("Error deleting path " + context.fullPath + ": " + e);
         } 
       }
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 03:37:22 2011
@@ -21,8 +21,9 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.List;
 
-
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -134,4 +135,18 @@ class DefaultTaskController extends Task
     }
   }
   
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+         throws IOException {
+    try {
+      FileUtil.chmod(context.fullPath, "a+rwx", true);
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
+          " for deletion.");
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:37:22 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -2828,7 +2829,8 @@ class JobInProgress {
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(conf,tempDir); 
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          FileSystem.get(conf), tempDir.toUri().getPath())); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:37:22 2011
@@ -68,7 +68,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.JobHistory.Keys;
@@ -76,6 +75,7 @@ import org.apache.hadoop.mapred.JobHisto
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
@@ -3533,7 +3533,9 @@ public class JobTracker implements MRCon
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
-      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          FileSystem.get(conf),
+          getSystemDirectoryForJob(jobId).toUri().getPath()));
       job.fail();
       if (userFileForJob != null) {
         userFileForJob.delete();
@@ -3551,7 +3553,9 @@ public class JobTracker implements MRCon
       if (userFileForJob != null) {
         userFileForJob.delete();
       }
-      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          FileSystem.get(conf),
+          getSystemDirectoryForJob(jobId).toUri().getPath()));
       throw ioe;
     }
 
@@ -3560,7 +3564,9 @@ public class JobTracker implements MRCon
     try {
       checkMemoryRequirements(job);
     } catch (IOException ioe) {
-      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          FileSystem.get(conf),
+          getSystemDirectoryForJob(jobId).toUri().getPath()));
       throw ioe;
     }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 03:37:22 2011
@@ -416,7 +416,12 @@ class JvmManager {
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
             if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              FileUtil.fullyDelete(env.workDir);
+              tracker.directoryCleanupThread.addToQueue(
+                  TaskTracker.buildTaskControllerPathDeletionContexts(
+                      tracker.getLocalFileSystem(), tracker.getLocalDirs(),
+                      initalContext.task,
+                      true /* workDir */,
+                      tracker.getTaskController()));
             }
           } catch (IOException ie){}
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 03:37:22 2011
@@ -24,13 +24,16 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -110,7 +113,8 @@ class LinuxTaskController extends TaskCo
   enum TaskCommands {
     LAUNCH_TASK_JVM,
     TERMINATE_TASK_JVM,
-    KILL_TASK_JVM
+    KILL_TASK_JVM,
+    ENABLE_TASK_FOR_CLEANUP
   }
   
   /**
@@ -150,7 +154,7 @@ class LinuxTaskController extends TaskCo
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
-                                    launchTaskJVMArgs, env);
+                                    launchTaskJVMArgs, env.workDir, env.env);
     context.shExec = shExec;
     try {
       shExec.execute();
@@ -167,6 +171,40 @@ class LinuxTaskController extends TaskCo
   }
 
   /**
+   * Helper method that runs a LinuxTaskController command
+   * 
+   * @param taskCommand
+   * @param user
+   * @param cmdArgs
+   * @param env
+   * @throws IOException
+   */
+  private void runCommand(TaskCommands taskCommand, String user,
+      List<String> cmdArgs, File workDir, Map<String, String> env)
+      throws IOException {
+
+    ShellCommandExecutor shExec =
+        buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+          + shExec.getExitCode());
+      LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+          + StringUtils.stringifyException(e));
+      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+          + " follows:");
+      logOutput(shExec.getOutput());
+      throw new IOException(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+          + " follows:");
+      logOutput(shExec.getOutput());
+    }
+  }
+
+  /**
    * Returns list of arguments to be passed while launching task VM.
    * See {@code buildTaskControllerExecutor(TaskCommands, 
    * String, List<String>, JvmEnv)} documentation.
@@ -191,6 +229,67 @@ class LinuxTaskController extends TaskCo
     return commandArgs;
   }
   
+  private List<String> buildTaskCleanupArgs(
+      TaskControllerPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.task.getJobID().toString());
+
+    String workDir = "";
+    if (context.isWorkDir) {
+      workDir = "/work";
+    }
+    if (context.task.isTaskCleanupTask()) {
+      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+                      + workDir);
+    } else {
+      commandArgs.add(context.task.getTaskID() + workDir);
+    }
+
+    return commandArgs;
+  }
+
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
+                + " for " + context.fullPath);
+    }
+
+    if (context instanceof TaskControllerPathDeletionContext) {
+      TaskControllerPathDeletionContext tContext =
+        (TaskControllerPathDeletionContext) context;
+    
+      if (tContext.task.getUser() != null && tContext.fs instanceof LocalFileSystem) {
+        runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
+                   tContext.task.getUser(),
+                   buildTaskCleanupArgs(tContext), null, null);
+      }
+      else {
+        throw new IllegalArgumentException("Either user is null or the "  +
+                               "file system is not local file system.");
+      }
+    }
+    else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+          + "TaskControllerPathDeletionContext.");
+    }
+  }
+
+  private void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
+      }
+    }
+  }
+
   // get the Job ID from the information in the TaskControllerContext
   private String getJobId(TaskControllerContext context) {
     String taskId = context.task.getTaskID().toString();
@@ -299,10 +398,10 @@ class LinuxTaskController extends TaskCo
    * @return {@link ShellCommandExecutor}
    * @throws IOException
    */
-  private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
-                                          String userName, 
-                                          List<String> cmdArgs, JvmEnv env) 
-                                    throws IOException {
+  private ShellCommandExecutor buildTaskControllerExecutor(
+      TaskCommands command, String userName, List<String> cmdArgs,
+      File workDir, Map<String, String> env) 
+      throws IOException {
     String[] taskControllerCmd = new String[3 + cmdArgs.size()];
     taskControllerCmd[0] = taskControllerExe;
     taskControllerCmd[1] = userName;
@@ -317,9 +416,9 @@ class LinuxTaskController extends TaskCo
       }
     }
     ShellCommandExecutor shExec = null;
-    if(env.workDir != null && env.workDir.exists()) {
+    if(workDir != null && workDir.exists()) {
       shExec = new ShellCommandExecutor(taskControllerCmd,
-          env.workDir, env.env);
+          workDir, env);
     } else {
       shExec = new ShellCommandExecutor(taskControllerCmd);
     }
@@ -467,7 +566,8 @@ class LinuxTaskController extends TaskCo
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env);
+        buildKillTaskCommandArgs(context), context.env.workDir,
+        context.env.env);
     try {
       shExec.execute();
     } catch (Exception e) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 03:37:22 2011
@@ -23,6 +23,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -124,6 +127,63 @@ abstract class TaskController implements
   }
 
   /**
+   * Contains info related to the path of the file/dir to be deleted. This info
+   * is needed by task-controller to build the full path of the file/dir
+   */
+  static class TaskControllerPathDeletionContext extends PathDeletionContext {
+    Task task;
+    boolean isWorkDir;
+    TaskController taskController;
+
+    /**
+     * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
+     * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
+     * is built using mapredLocalDir, jobId, taskId, etc.
+     */
+    Path mapredLocalDir;
+
+    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
+        Task task, boolean isWorkDir, TaskController taskController) {
+      super(fs, null);
+      this.task = task;
+      this.isWorkDir = isWorkDir;
+      this.taskController = taskController;
+      this.mapredLocalDir = mapredLocalDir;
+    }
+
+    @Override
+    protected String getPathForCleanup() {
+      if (fullPath == null) {
+        fullPath = buildPathForDeletion();
+      }
+      return fullPath;
+    }
+
+    /**
+     * Builds the path of taskAttemptDir OR taskWorkDir based on
+     * mapredLocalDir, jobId, taskId, etc
+     */
+    String buildPathForDeletion() {
+      String subDir = TaskTracker.getLocalTaskDir(task.getJobID().toString(),
+          task.getTaskID().toString(), task.isTaskCleanupTask());
+      if (isWorkDir) {
+        subDir = subDir + Path.SEPARATOR + "work";
+      }
+      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable by
+     * setting proper permissions(777) by task-controller
+     */
+    @Override
+    protected void enablePathForCleanup() throws IOException {
+      getPathForCleanup();// allow init of fullPath
+      taskController.enableTaskForCleanup(this);
+    }
+  }
+
+  /**
    * Method which is called after the job is localized so that task controllers
    * can implement their own job localization logic.
    * 
@@ -146,4 +206,12 @@ abstract class TaskController implements
    */
   
   abstract void killTask(TaskControllerContext context);
+  
+  /**
+   * Enable the task for cleanup by changing permissions of the path
+   * @param context   path deletion context
+   * @throws IOException
+   */
+  abstract void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:37:22 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 
@@ -552,12 +551,53 @@ abstract class TaskRunner extends Thread
     }
   }
   
+  /**
+   * Sets permissions recursively and then deletes the contents of dir.
+   * Makes dir empty directory(does not delete dir itself).
+   */
+  static void deleteDirContents(JobConf conf, File dir) throws IOException {
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(new Path(dir.getAbsolutePath()))) {
+      File contents[] = dir.listFiles();
+      if (contents != null) {
+        for (int i = 0; i < contents.length; i++) {
+          try {
+            int ret = 0;
+            if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
+                                      "a+rwx", true)) != 0) {
+              LOG.warn("Unable to chmod for " + contents[i] + 
+                  "; chmod exit status = " + ret);
+            }
+          } catch(InterruptedException e) {
+            LOG.warn("Interrupted while setting permissions for contents of " +
+                "workDir. Not deleting the remaining contents of workDir.");
+            return;
+          }
+          if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
+            LOG.warn("Unable to delete "+ contents[i]);
+          }
+        }
+      }
+    }
+    else {
+      LOG.warn(dir + " does not exist.");
+    }
+  }
+  
   //Mostly for setting up the symlinks. Note that when we setup the distributed
   //cache, we didn't create the symlinks. This is done on a per task basis
   //by the currently executing task.
   public static void setupWorkDir(JobConf conf) throws IOException {
     File workDir = new File(".").getAbsoluteFile();
-    FileUtil.fullyDelete(workDir);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fully deleting contents of " + workDir);
+    }
+
+    /** delete only the contents of workDir leaving the directory empty. We
+     * can't delete the workDir as it is the current working directory.
+     */
+    deleteDirContents(conf, workDir);
+    
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077069&r1=1077068&r2=1077069&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:37:22 2011
@@ -65,6 +65,8 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
@@ -207,6 +209,8 @@ public class TaskTracker 
   private int maxReduceSlots;
   private int failures;
   
+  private FileSystem localFs;
+  
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
   static final String TT_OUTOFBAND_HEARBEAT =
@@ -218,7 +222,7 @@ public class TaskTracker 
   
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
-  private CleanupQueue directoryCleanupThread;
+  CleanupQueue directoryCleanupThread;
   volatile JvmManager jvmManager;
   
   private TaskMemoryManagerThread taskMemoryManager;
@@ -364,7 +368,7 @@ public class TaskTracker 
   TaskController getTaskController() {
     return taskController;
   }
-  
+
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -455,6 +459,7 @@ public class TaskTracker 
   synchronized void initialize() throws IOException {
     // use configured nameserver & interface to get local hostname
     this.fConf = new JobConf(originalConf);
+    localFs = FileSystem.getLocal(fConf);
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
     }
@@ -1399,6 +1404,32 @@ public class TaskTracker 
     }
   }
 
+  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
+      Path[] paths) {
+    int i = 0;
+    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
+    }
+    return contexts;
+  }
+
+  static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
+      TaskController taskController)
+      throws IOException {
+    int i = 0;
+    PathDeletionContext[] contexts =
+                          new TaskControllerPathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+                          isWorkDir, taskController);
+    }
+    return contexts;
+  }
+
   /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
@@ -1427,8 +1458,9 @@ public class TaskTracker 
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
-            getLocalJobDir(rjob.getJobID().toString())));
+          PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
+              getLocalFiles(fConf, getLocalJobDir(rjob.getJobID().toString())));
+          directoryCleanupThread.addToQueue(contexts);
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -2524,6 +2556,7 @@ public class TaskTracker 
           }
           String taskDir = getLocalTaskDir(task.getJobID().toString(),
                              taskId.toString(), task.isTaskCleanupTask());
+
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2535,21 +2568,23 @@ public class TaskTracker 
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(defaultJobConf,
-                  getLocalFiles(defaultJobConf,
-                  taskDir));
-            }  
-            
+              PathDeletionContext[] contexts =
+                buildTaskControllerPathDeletionContexts(localFs, getLocalDirs(),
+                  task, false/* not workDir */, taskController);
+              directoryCleanupThread.addToQueue(contexts);
+            }
             else {
-              directoryCleanupThread.addToQueue(defaultJobConf,
-                  getLocalFiles(defaultJobConf,
-                taskDir+"/job.xml"));
+              PathDeletionContext[] contexts = buildPathDeletionContexts(
+                  localFs, getLocalFiles(defaultJobConf, taskDir+"/job.xml"));
+              directoryCleanupThread.addToQueue(contexts);
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(defaultJobConf,
-                  getLocalFiles(defaultJobConf,
-                  taskDir+"/work"));
+              PathDeletionContext[] contexts =
+                buildTaskControllerPathDeletionContexts(localFs, getLocalDirs(),
+                  task, true /* workDir */,
+                  taskController);
+              directoryCleanupThread.addToQueue(contexts);
             }  
           }
         } catch (Throwable ie) {
@@ -3090,7 +3125,7 @@ public class TaskTracker 
   }
 
   // get the full paths of the directory in all the local disks.
-  private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+  Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
     String[] localDirs = conf.getLocalDirs();
     Path[] paths = new Path[localDirs.length];
     FileSystem localFs = FileSystem.getLocal(conf);
@@ -3101,6 +3136,22 @@ public class TaskTracker 
     return paths;
   }
 
+  // get the paths in all the local disks.
+  Path[] getLocalDirs() throws IOException{
+    String[] localDirs = fConf.getLocalDirs();
+    Path[] paths = new Path[localDirs.length];
+    FileSystem localFs = FileSystem.getLocal(fConf);
+    for (int i = 0; i < localDirs.length; i++) {
+      paths[i] = new Path(localDirs[i]);
+      paths[i] = paths[i].makeQualified(localFs);
+    }
+    return paths;
+  }
+
+  FileSystem getLocalFileSystem(){
+    return localFs;
+  }
+
   int getMaxCurrentMapTasks() {
     return maxMapSlots;
   }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java?rev=1077069&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestChildTaskDirs.java Fri Mar  4 03:37:22 2011
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Validates removal of user-created-files(and set non-writable permissions) in
+ * tasks under taskWorkDir by TT with LinuxTaskController.
+ */
+public class TestChildTaskDirs extends ClusterWithLinuxTaskController {
+  private static final Log LOG = LogFactory.getLog(TestChildTaskDirs.class);
+  private static final File TEST_DIR = 
+    new File(System.getProperty("test.build.data", "/tmp"), "child-dirs");
+  private static final String MY_DIR = "my-test-dir";
+  private static final String MY_FILE = "my-test-file";
+  private static final LocalDirAllocator LOCAL_DIR_ALLOC = 
+    new LocalDirAllocator("mapred.local.dir");
+
+  public static Test suite() {
+    TestSetup setup = 
+      new TestSetup(new TestSuite(TestChildTaskDirs.class)) {
+      protected void setUp() throws Exception {
+        TEST_DIR.mkdirs();
+      }
+      protected void tearDown() throws Exception {
+        FileUtil.fullyDelete(TEST_DIR);
+      }
+    };
+    return setup;
+  }
+
+  class InlineCleanupQueue extends CleanupQueue {
+    List<String> stalePaths = new ArrayList<String>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(PathDeletionContext... contexts) {
+      // delete paths in-line
+      for (PathDeletionContext context : contexts) {
+        try {
+          if (!deletePath(context)) {
+            LOG.warn("Stale path " + context.fullPath);
+            stalePaths.add(context.fullPath);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + context.fullPath);
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(context.fullPath);
+        }
+      }
+    }
+  }
+
+  // Mapper that creates dirs
+  // job-id/
+  //   -attempt-id/
+  //      -work/
+  //         -my-test-dir(555)
+  //            -my-test-file(555)
+  static class CreateDir extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    File taskWorkDir = null;
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+      File subDir = new File(taskWorkDir, MY_DIR);
+      LOG.info("Child folder : " + subDir);
+      subDir.mkdirs();
+      File newFile = new File(subDir, MY_FILE);
+      LOG.info("Child file : " + newFile);
+      newFile.createNewFile();
+
+      // Set the permissions of my-test-dir and my-test-dir/my-test-file to 555
+      try {
+        FileUtil.chmod(subDir.getAbsolutePath(), "a=rx", true);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    
+    @Override
+    public void configure(JobConf conf) {
+      String jobId = conf.get("mapred.job.id");
+      String taskId = conf.get("mapred.task.id");
+      String taskDir = TaskTracker.getLocalTaskDir(jobId, taskId);
+      try {
+        Path taskDirPath = 
+          LOCAL_DIR_ALLOC.getLocalPathForWrite(taskDir, conf);
+        taskWorkDir = new File(taskDirPath.toString(), "work");
+        LOG.info("Task work-dir : " + taskWorkDir.toString());
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
+    }
+  }
+
+  public void testChildDirCleanup() throws Exception {
+    LOG.info("Testing if the dirs created by the child process is cleaned up properly");
+
+    if (!shouldRun()) {
+      return;
+    }
+
+    // start the cluster
+    startCluster();
+
+    // make sure that only one tracker is configured
+    if (mrCluster.getNumTaskTrackers() != 1) {
+      throw new Exception("Cluster started with " 
+        + mrCluster.getNumTaskTrackers() + " instead of 1");
+    }
+    
+    // configure a job
+    JobConf jConf = getClusterConf();
+    jConf.setJobName("Mkdir job");
+    jConf.setMapperClass(CreateDir.class);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+
+    FileSystem fs = FileSystem.get(jConf);
+    Path inDir = new Path("in");
+    Path outDir = new Path("out");
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    String input = "The quick brown fox";
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes(input);
+    file.close();
+
+    jConf.setInputFormat(TextInputFormat.class);
+    jConf.setOutputKeyClass(LongWritable.class);
+    jConf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(jConf, inDir);
+    FileOutputFormat.setOutputPath(jConf, outDir);
+
+    // set inline cleanup queue in TT
+    mrCluster.getTaskTrackerRunner(0).getTaskTracker().directoryCleanupThread =
+      new InlineCleanupQueue();
+
+    JobClient jobClient = new JobClient(jConf);
+    RunningJob job = jobClient.submitJob(jConf);
+
+    JobID id = job.getID();
+    
+    // wait for the job to finish
+    job.waitForCompletion();
+    
+    JobInProgress jip = 
+      mrCluster.getJobTrackerRunner().getJobTracker().getJob(id);
+    String attemptId = 
+      jip.getMapTasks()[0].getTaskStatuses()[0].getTaskID().toString();
+    
+    String taskTrackerLocalDir = 
+      mrCluster.getTaskTrackerRunner(0).getLocalDir();
+    
+    String taskDir = TaskTracker.getLocalTaskDir(id.toString(), attemptId);
+    Path taskDirPath = new Path(taskTrackerLocalDir, taskDir);
+    LOG.info("Checking task dir " + taskDirPath);
+    FileSystem localFS = FileSystem.getLocal(jConf);
+    assertFalse("task dir still exists", localFS.exists(taskDirPath));
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1077069&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Fri Mar  4 03:37:22 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+public class TestSetupWorkDir extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestSetupWorkDir.class);
+
+  /**
+   * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
+   * can delete it directly(without doing chmod).
+   * Creates dir/subDir and dir/subDir/file
+   */
+  static void createFileAndSetPermissions(JobConf jobConf, Path dir)
+       throws IOException {
+    Path subDir = new Path(dir, "subDir");
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    fs.mkdirs(subDir);
+    Path p = new Path(subDir, "file");
+    DataOutputStream out = fs.create(p);
+    out.writeBytes("dummy input");
+    out.close();
+    // no write permission for subDir and subDir/file
+    try {
+      int ret = 0;
+      if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
+        LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
+      }
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while doing chmod for " + subDir);
+    }
+  }
+
+  /**
+   * Validates if setupWorkDir is properly cleaning up contents of workDir.
+   * TODO: other things of TaskRunner.setupWorkDir() related to distributed
+   * cache need to be validated.
+   */
+  public void testSetupWorkDir() throws IOException {
+    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
+                            "testSetupWorkDir");
+    Path myWorkDir = new Path(rootDir, "./work");
+    JobConf jConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jConf);
+    if (fs.exists(myWorkDir)) {
+      fs.delete(myWorkDir, true);
+    }
+    if (!fs.mkdirs(myWorkDir)) {
+      throw new IOException("Unable to create workDir " + myWorkDir);
+    }
+
+    // create {myWorkDir}/subDir/file and set 555 perms for subDir and file
+    createFileAndSetPermissions(jConf, myWorkDir);
+
+    TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath()));
+    
+    assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.",
+        fs.listStatus(myWorkDir).length == 0);
+    
+    // cleanup
+    fs.delete(rootDir, true);
+  }
+}



Mime
View raw message