Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 46498 invoked from network); 4 Mar 2011 03:44:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:44:10 -0000 Received: (qmail 85077 invoked by uid 500); 4 Mar 2011 03:44:09 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 85043 invoked by uid 500); 4 Mar 2011 03:44:09 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 85029 invoked by uid 99); 4 Mar 2011 03:44:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:44:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:44:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0DEFC23888CD; Fri, 4 Mar 2011 03:43:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077127 - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/server/tasktracker/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/secur... Date: Fri, 04 Mar 2011 03:43:44 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304034345.0DEFC23888CD@eris.apache.org> Author: omalley Date: Fri Mar 4 03:43:43 2011 New Revision: 1077127 URL: http://svn.apache.org/viewvc?rev=1077127&view=rev Log: commit b2c61f3f33b3c2ff8b42efd7473752b9b4bc125c Author: Hemanth Yamijala Date: Tue Jan 26 15:10:41 2010 +0530 MAPREDUCE:896 from https://issues.apache.org/jira/secure/attachment/12431413/MR-896.v8-y20.patch +++ b/YAHOO-CHANGES.txt + MAPREDUCE-896. Enhance tasktracker to cleanup files that might have + been created by user tasks with non-writable permissions. + (Ravi Gummadi via yhemanth) + Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar 4 03:43:43 2011 @@ -58,6 +58,7 @@ int main(int argc, char **argv) { NULL, 0 } }; const char* log_file = NULL; + char * dir_to_be_deleted = NULL; //Minimum number of arguments required to run the task-controller //command-name user command tt-root @@ -135,6 +136,13 @@ int main(int argc, char **argv) { task_pid = argv[optind++]; exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL); break; + case ENABLE_TASK_FOR_CLEANUP: + tt_root = argv[optind++]; + job_id = argv[optind++]; + dir_to_be_deleted = argv[optind++]; + exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id, + dir_to_be_deleted); + break; default: exit_code = INVALID_COMMAND_PROVIDED; } Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 03:43:43 2011 @@ -197,6 +197,17 @@ char *get_task_launcher_file(const char attempt_dir); } +/* + * Builds the full path of the dir(localTaskDir or localWorkDir) + * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller + * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work) + */ +char *get_task_dir_path(const char *tt_root, const char *user, + const char *jobid, const char *dir_to_be_deleted) { + return concatenate(TT_LOCAL_TASK_DIR_PATTERN, "task_dir_full_path", 4, + tt_root, user, jobid, dir_to_be_deleted); +} + /** * Get the log directory for the given attempt. */ @@ -218,17 +229,17 @@ int check_tt_root(const char *tt_root) { * launcher file resolve to one and same. This is done so as to avoid * security pitfalls because of relative path components in the file name. */ -int check_task_launcher_path(char *path) { +int check_path_for_relative_components(char *path) { char * resolved_path = (char *) canonicalize_file_name(path); if (resolved_path == NULL) { fprintf(LOGFILE, - "Error resolving the task launcher file path: %s. Passed path: %s\n", + "Error resolving the path: %s. Passed path: %s\n", strerror(errno), path); return ERROR_RESOLVING_FILE_PATH; } if (strcmp(resolved_path, path) != 0) { fprintf(LOGFILE, - "Relative path components in the file path: %s. Resolved path: %s\n", + "Relative path components in the path: %s. Resolved path: %s\n", path, resolved_path); free(resolved_path); return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH; @@ -255,20 +266,23 @@ static int change_owner(const char *path static int change_mode(const char *path, mode_t mode) { int exit_code = chmod(path, mode); if (exit_code != 0) { - fprintf(LOGFILE, "chown %d of path %s failed: %s.\n", mode, path, + fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path, strerror(errno)); } return exit_code; } /** - * Function to secure the given path. It does the following recursively: + * Function to change permissions of the given path. It does the following + * recursively: * 1) changes the owner/group of the paths to the passed owner/group * 2) changes the file permission to the passed file_mode and directory * permission to the passed dir_mode + * + * should_check_ownership : boolean to enable checking of ownership of each path */ static int secure_path(const char *path, uid_t uid, gid_t gid, - mode_t file_mode, mode_t dir_mode) { + mode_t file_mode, mode_t dir_mode, int should_check_ownership) { FTS *tree = NULL; // the file hierarchy FTSENT *entry = NULL; // a file in the hierarchy char *paths[] = { (char *) path, NULL };//array needs to be NULL-terminated @@ -361,7 +375,8 @@ static int secure_path(const char *path, if (!process_path) { continue; } - if (compare_ownership(uid, gid, entry->fts_path) == 0) { + if (should_check_ownership && + (compare_ownership(uid, gid, entry->fts_path) == 0)) { // already set proper permissions. // This might happen with distributed cache. #ifdef DEBUG @@ -373,7 +388,7 @@ static int secure_path(const char *path, continue; } - if (check_ownership(entry->fts_path) != 0) { + if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) { fprintf(LOGFILE, "Invalid file path. %s not user/group owned by the tasktracker.\n", entry->fts_path); @@ -466,8 +481,9 @@ int prepare_attempt_directories(const ch free(job_dir); break; } - } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid, - S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) { + } else if (secure_path(attempt_dir, user_detail->pw_uid, + tasktracker_gid, S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, + 1) != 0) { // No setgid on files and setgid on dirs, 770 fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir); failed = 1; @@ -526,8 +542,8 @@ int prepare_task_logs(const char *log_di } gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU - | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) { + if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, + S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) { // setgid on dirs but not files, 770. As of now, there are no files though fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir); return -1; @@ -640,9 +656,9 @@ int initialize_user(const char *user) { free(user_dir); break; } - } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid, - S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG) - != 0) { + } else if (secure_path(user_dir, user_detail->pw_uid, + tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | + S_IXUSR | S_IRWXG, 1) != 0) { // No setgid on files and setgid on dirs, 570 fprintf(LOGFILE, "Failed to secure the user_dir %s\n", user_dir); @@ -722,7 +738,7 @@ int initialize_job(const char *jobid, co break; } } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid, - S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG) + S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG, 1) != 0) { // No setgid on files and setgid on dirs, 570 fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir); @@ -848,7 +864,7 @@ int initialize_distributed_cache(const c } } else if (secure_path(distcache_dir, user_detail->pw_uid, tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR - | S_IXUSR | S_IRWXG) != 0) { + | S_IXUSR | S_IRWXG, 1) != 0) { // No setgid on files and setgid on dirs, 570 fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n", distcache_dir); @@ -963,7 +979,7 @@ int run_task_as_user(const char * user, } errno = 0; - exit_code = check_task_launcher_path(task_script_path); + exit_code = check_path_for_relative_components(task_script_path); if(exit_code != 0) { goto cleanup; } @@ -1048,3 +1064,60 @@ int kill_user_task(const char *user, con cleanup(); return 0; } + +/** + * Enables the path for deletion by changing the owner, group and permissions + * of the specified path and all the files/directories in the path recursively. + * * sudo chown user:mapred -R full_path + * * sudo chmod 2770 -R full_path + * Before changing permissions, makes sure that the given path doesn't contain + * any relative components. + * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller + * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted + */ +int enable_task_for_cleanup(const char *tt_root, const char *user, + const char *jobid, const char *dir_to_be_deleted) { + int exit_code = 0; + gid_t tasktracker_gid = getegid(); // the group permissions of the binary. + + char * full_path = NULL; + if (check_tt_root(tt_root) < 0) { + fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root); + cleanup(); + return INVALID_TT_ROOT; + } + + full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted); + if (full_path == NULL) { + fprintf(LOGFILE, + "Could not build the full path. Not deleting the dir %s\n", + dir_to_be_deleted); + exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed + } + // Make sure that the path given is not having any relative components + else if ((exit_code = check_path_for_relative_components(full_path)) != 0) { + fprintf(LOGFILE, + "Not changing permissions. Path may contain relative components.\n", + full_path); + } + else if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user); + exit_code = INVALID_USER_NAME; + } + else if (exit_code = secure_path(full_path, user_detail->pw_uid, + tasktracker_gid, + S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 0) != 0) { + // No setgid on files and setgid on dirs, 770. + // set 770 permissions for user, TTgroup for all files/directories in + // 'full_path' recursively sothat deletion of path by TaskTracker succeeds. + + fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path); + } + + if (full_path != NULL) { + free(full_path); + } + // free configurations + cleanup(); + return exit_code; +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar 4 03:43:43 2011 @@ -44,6 +44,7 @@ enum command { INITIALIZE_TASK, TERMINATE_TASK_JVM, KILL_TASK_JVM, + ENABLE_TASK_FOR_CLEANUP }; enum errorcodes { @@ -67,6 +68,7 @@ enum errorcodes { OUT_OF_MEMORY, //18 INITIALIZE_DISTCACHE_FAILED, //19 INITIALIZE_USER_FAILED, //20 + UNABLE_TO_BUILD_PATH //21 }; #define USER_DIR_PATTERN "%s/taskTracker/%s" @@ -83,6 +85,8 @@ enum errorcodes { #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh" +#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/%s/jobcache/%s/%s" + #define TT_SYS_DIR_KEY "mapred.local.dir" #define TT_LOG_DIR_KEY "hadoop.log.dir" @@ -109,6 +113,9 @@ int initialize_distributed_cache(const c int kill_user_task(const char *user, const char *task_pid, int sig); +int enable_task_for_cleanup(const char *tt_root, const char *user, + const char *jobid, const char *dir_to_be_deleted); + int prepare_attempt_directory(const char *attempt_dir, const char *user); // The following functions are exposed for testing Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Mar 4 03:43:43 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; +import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -38,7 +39,7 @@ class CleanupQueue { * paths(directories/files) in a separate thread. This constructor creates a * clean-up thread and also starts it as a daemon. Callers can instantiate one * CleanupQueue per JVM and can use it for deleting paths. Use - * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for + * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for * deletion. */ public CleanupQueue() { @@ -49,22 +50,61 @@ class CleanupQueue { } } - public void addToQueue(FileSystem fs, Path...paths) { - cleanupThread.addToQueue(fs, paths); + /** + * Contains info related to the path of the file/dir to be deleted + */ + static class PathDeletionContext { + String fullPath;// full path of file or dir + FileSystem fs; + + public PathDeletionContext(FileSystem fs, String fullPath) { + this.fs = fs; + this.fullPath = fullPath; + } + + protected String getPathForCleanup() { + return fullPath; + } + + /** + * Makes the path(and its subdirectories recursively) fully deletable + */ + protected void enablePathForCleanup() throws IOException { + // Do nothing by default. + // Subclasses can override to provide enabling for deletion. + } } - private static class PathCleanupThread extends Thread { + /** + * Adds the paths to the queue of paths to be deleted by cleanupThread. + */ + void addToQueue(PathDeletionContext... contexts) { + cleanupThread.addToQueue(contexts); + } - static class PathAndFS { - FileSystem fs; - Path path; - PathAndFS(FileSystem fs, Path path) { - this.fs = fs; - this.path = path; - } + protected static boolean deletePath(PathDeletionContext context) + throws IOException { + context.enablePathForCleanup(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to delete " + context.fullPath); + } + if (context.fs.exists(new Path(context.fullPath))) { + return context.fs.delete(new Path(context.fullPath), true); } + return true; + } + + // currently used by tests only + protected boolean isQueueEmpty() { + return (cleanupThread.queue.size() == 0); + } + + private static class PathCleanupThread extends Thread { + // cleanup queue which deletes files/directories of the paths queued up. - private LinkedBlockingQueue queue = new LinkedBlockingQueue(); + private LinkedBlockingQueue queue = + new LinkedBlockingQueue(); public PathCleanupThread() { setName("Directory/File cleanup thread"); @@ -72,27 +112,34 @@ class CleanupQueue { start(); } - public void addToQueue(FileSystem fs, Path... paths) { - for (Path p : paths) { + void addToQueue(PathDeletionContext[] contexts) { + for (PathDeletionContext context : contexts) { try { - queue.put(new PathAndFS(fs, p)); - } catch (InterruptedException ie) {} + queue.put(context); + } catch(InterruptedException ie) {} } } public void run() { - LOG.debug(getName() + " started."); - PathAndFS pathAndFS = null; + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + " started."); + } + PathDeletionContext context = null; while (true) { try { - pathAndFS = queue.take(); + context = queue.take(); // delete the path. - pathAndFS.fs.delete(pathAndFS.path, true); - LOG.debug("DELETED " + pathAndFS.path); + if (!deletePath(context)) { + LOG.warn("CleanupThread:Unable to delete path " + context.fullPath); + } + else if (LOG.isDebugEnabled()) { + LOG.debug("DELETED " + context.fullPath); + } } catch (InterruptedException t) { + LOG.warn("Interrupted deletion of " + context.fullPath); return; } catch (Exception e) { - LOG.warn("Error deleting path" + pathAndFS.path); + LOG.warn("Error deleting path " + context.fullPath + ": " + e); } } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 03:43:43 2011 @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.List; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.JvmManager.JvmEnv; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; @@ -133,6 +135,23 @@ public class DefaultTaskController exten } } + /** + * Enables the task for cleanup by changing permissions of the specified path + * in the local filesystem + */ + @Override + void enableTaskForCleanup(PathDeletionContext context) + throws IOException { + try { + FileUtil.chmod(context.fullPath, "ug+rwx", true); + } catch(InterruptedException e) { + LOG.warn("Interrupted while setting permissions for " + context.fullPath + + " for deletion."); + } catch(IOException ioe) { + LOG.warn("Unable to change permissions of " + context.fullPath); + } + } + @Override public void initializeDistributedCache(InitializationContext context) { // Do nothing. Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:43:43 2011 @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.JobHistory.Values; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; @@ -2850,7 +2851,8 @@ class JobInProgress { } Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID()); - new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); + new CleanupQueue().addToQueue(new PathDeletionContext( + jobtracker.getFileSystem(tempDir), tempDir.toUri().getPath())); } catch (IOException e) { LOG.warn("Error cleaning up "+profile.getJobID()+": "+e); } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 03:43:43 2011 @@ -30,7 +30,6 @@ import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.util.ProcessTree; @@ -144,6 +143,22 @@ class JvmManager { } } + /** + * Adds the task's work dir to the cleanup queue of taskTracker for + * asynchronous deletion of work dir. + * @param tracker taskTracker + * @param task the task whose work dir needs to be deleted + * @throws IOException + */ + static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException { + tracker.getCleanupThread().addToQueue( + TaskTracker.buildTaskControllerPathDeletionContexts( + tracker.getLocalFileSystem(), + tracker.getLocalFiles(tracker.getJobConf(), ""), + task, true /* workDir */, + tracker.getTaskController())); + } + private static class JvmManagerForType { //Mapping from the JVM IDs to running Tasks Map jvmToRunningTask = @@ -438,7 +453,7 @@ class JvmManager { //task at the beginning of each task in the task JVM. //For the last task, we do it here. if (env.conf.getNumTasksToExecutePerJvm() != 1) { - FileUtil.fullyDelete(env.workDir); + deleteWorkDir(tracker, initalContext.task); } } catch (IOException ie){} } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:43:43 2011 @@ -29,6 +29,9 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.JvmManager.JvmEnv; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -87,6 +90,7 @@ class LinuxTaskController extends TaskCo INITIALIZE_TASK, TERMINATE_TASK_JVM, KILL_TASK_JVM, + ENABLE_TASK_FOR_CLEANUP } /** @@ -208,12 +212,75 @@ class LinuxTaskController extends TaskCo @Override void initializeTask(TaskControllerContext context) throws IOException { - LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString() - + " for " + context.task.getTaskID().toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString() + + " for " + context.task.getTaskID().toString()); + } runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(), buildInitializeTaskArgs(context), context.env.workDir, context.env.env); } + /** + * Builds the args to be passed to task-controller for enabling of task for + * cleanup. Last arg in this List is either $attemptId or $attemptId/work + */ + private List buildTaskCleanupArgs( + TaskControllerPathDeletionContext context) { + List commandArgs = new ArrayList(3); + commandArgs.add(context.mapredLocalDir.toUri().getPath()); + commandArgs.add(context.task.getJobID().toString()); + + String workDir = ""; + if (context.isWorkDir) { + workDir = "/work"; + } + if (context.task.isTaskCleanupTask()) { + commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX + + workDir); + } else { + commandArgs.add(context.task.getTaskID() + workDir); + } + + return commandArgs; + } + + /** + * Enables the task for cleanup by changing permissions of the specified path + * in the local filesystem + */ + @Override + void enableTaskForCleanup(PathDeletionContext context) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString() + + " for " + context.fullPath); + } + + if (context instanceof TaskControllerPathDeletionContext) { + TaskControllerPathDeletionContext tContext = + (TaskControllerPathDeletionContext) context; + + if (tContext.task.getUser() != null && + tContext.fs instanceof LocalFileSystem) { + try { + runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP, + tContext.task.getUser(), + buildTaskCleanupArgs(tContext), null, null); + } catch(IOException e) { + LOG.warn("Uanble to change permissions for " + tContext.fullPath); + } + } + else { + throw new IllegalArgumentException("Either user is null or the " + + "file system is not local file system."); + } + } + else { + throw new IllegalArgumentException("PathDeletionContext provided is not " + + "TaskControllerPathDeletionContext."); + } + } + private void logOutput(String output) { String shExecOutput = output; if (shExecOutput != null) { @@ -436,7 +503,8 @@ class LinuxTaskController extends TaskCo } ShellCommandExecutor shExec = buildTaskControllerExecutor( command, context.env.conf.getUser(), - buildKillTaskCommandArgs(context), context.env.workDir, context.env.env); + buildKillTaskCommandArgs(context), context.env.workDir, + context.env.env); try { shExec.execute(); } catch (Exception e) { Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 03:43:43 2011 @@ -24,6 +24,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.JvmManager.JvmEnv; import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.util.StringUtils; @@ -187,6 +190,67 @@ public abstract class TaskController imp } /** + * Contains info related to the path of the file/dir to be deleted. This info + * is needed by task-controller to build the full path of the file/dir + */ + static class TaskControllerPathDeletionContext extends PathDeletionContext { + Task task; + boolean isWorkDir; + TaskController taskController; + + /** + * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or + * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir + * is built using mapredLocalDir, jobId, taskId, etc. + */ + Path mapredLocalDir; + + public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir, + Task task, boolean isWorkDir, TaskController taskController) { + super(fs, null); + this.task = task; + this.isWorkDir = isWorkDir; + this.taskController = taskController; + this.mapredLocalDir = mapredLocalDir; + } + + @Override + protected String getPathForCleanup() { + if (fullPath == null) { + fullPath = buildPathForDeletion(); + } + return fullPath; + } + + /** + * Builds the path of taskAttemptDir OR taskWorkDir based on + * mapredLocalDir, jobId, taskId, etc + */ + String buildPathForDeletion() { + String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(), + task.getJobID().toString(), task.getTaskID().toString(), + task.isTaskCleanupTask()) + : TaskTracker.getLocalTaskDir(task.getUser(), + task.getJobID().toString(), task.getTaskID().toString(), + task.isTaskCleanupTask()); + + return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir; + } + + /** + * Makes the path(and its subdirectories recursively) fully deletable by + * setting proper permissions(770) by task-controller + */ + @Override + protected void enablePathForCleanup() throws IOException { + getPathForCleanup();// allow init of fullPath, if not inited already + if (fs.exists(new Path(fullPath))) { + taskController.enableTaskForCleanup(this); + } + } + } + + /** * NOTE: This class is internal only class and not intended for users!! * */ @@ -207,6 +271,13 @@ public abstract class TaskController imp abstract void terminateTask(TaskControllerContext context); /** + * Enable the task for cleanup by changing permissions of the path + * @param context path deletion context + * @throws IOException + */ + abstract void enableTaskForCleanup(PathDeletionContext context) + throws IOException; + /** * Sends a KILL signal to forcefully terminate the taskJVM and its * sub-processes. * Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:43:43 2011 @@ -638,6 +638,39 @@ abstract class TaskRunner extends Thread } /** + * Sets permissions recursively and then deletes the contents of dir. + * Makes dir empty directory(does not delete dir itself). + */ + static void deleteDirContents(JobConf conf, File dir) throws IOException { + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(new Path(dir.getAbsolutePath()))) { + File contents[] = dir.listFiles(); + if (contents != null) { + for (int i = 0; i < contents.length; i++) { + try { + int ret = 0; + if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(), + "ug+rwx", true)) != 0) { + LOG.warn("Unable to chmod for " + contents[i] + + "; chmod exit status = " + ret); + } + } catch(InterruptedException e) { + LOG.warn("Interrupted while setting permissions for contents of " + + "workDir. Not deleting the remaining contents of workDir."); + return; + } + if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) { + LOG.warn("Unable to delete "+ contents[i]); + } + } + } + } + else { + LOG.warn(dir + " does not exist."); + } + } + + /** * Creates distributed cache symlinks and tmp directory, as appropriate. * Note that when we setup the distributed * cache, we didn't create the symlinks. This is done on a per task basis @@ -647,11 +680,14 @@ abstract class TaskRunner extends Thread * @param workDir Working directory, which is completely deleted. */ public static void setupWorkDir(JobConf conf, File workDir) throws IOException { - LOG.debug("Fully deleting and re-creating" + workDir); - FileUtil.fullyDelete(workDir); - if (!workDir.mkdir()) { - LOG.debug("Did not recreate " + workDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Fully deleting contents of " + workDir); } + + /** delete only the contents of workDir leaving the directory empty. We + * can't delete the workDir as it is the current working directory. + */ + deleteDirContents(conf, workDir); if (DistributedCache.getSymlink(conf)) { URI[] archives = DistributedCache.getCacheArchives(conf); Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:43:43 2011 @@ -69,6 +69,8 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapred.TaskController.JobInitializationContext; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; +import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext; import org.apache.hadoop.mapred.TaskLog.LogFileDetail; import org.apache.hadoop.mapred.TaskLog.LogName; import org.apache.hadoop.mapred.TaskStatus.Phase; @@ -186,7 +188,7 @@ public class TaskTracker // The filesystem where job files are stored FileSystem systemFS = null; - FileSystem localFs = null; + private FileSystem localFs = null; private final HttpServer server; volatile boolean shuttingDown = false; @@ -401,6 +403,11 @@ public class TaskTracker return taskController; } + // Currently this is used only by tests + void setTaskController(TaskController t) { + taskController = t; + } + private RunningJob addTaskToJob(JobID jobId, TaskInProgress tip) { synchronized (runningJobs) { @@ -518,10 +525,7 @@ public class TaskTracker static String getTaskWorkDir(String user, String jobid, String taskid, boolean isCleanupAttempt) { - String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid; - if (isCleanupAttempt) { - dir = dir + TASK_CLEANUP_SUFFIX; - } + String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt); return dir + Path.SEPARATOR + MRConstants.WORKDIR; } @@ -1204,7 +1208,16 @@ public class TaskTracker taskCleanupThread.start(); directoryCleanupThread = new CleanupQueue(); } + + // only used by tests + void setCleanupThread(CleanupQueue c) { + directoryCleanupThread = c; + } + CleanupQueue getCleanupThread() { + return directoryCleanupThread; + } + /** * The connection to the JobTracker, used by the TaskRunner * for locating remote files. @@ -1620,6 +1633,44 @@ public class TaskTracker } /** + * Builds list of PathDeletionContext objects for the given paths + */ + private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs, + Path[] paths) { + int i = 0; + PathDeletionContext[] contexts = new PathDeletionContext[paths.length]; + + for (Path p : paths) { + contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath()); + } + return contexts; + } + + /** + * Builds list of TaskControllerPathDeletionContext objects for a task + * @param fs : FileSystem in which the dirs to be deleted + * @param paths : mapred-local-dirs + * @param task : the task whose taskDir or taskWorkDir is going to be deleted + * @param isWorkDir : the dir to be deleted is workDir or taskDir + * @param taskController : the task-controller to be used for deletion of + * taskDir or taskWorkDir + */ + static PathDeletionContext[] buildTaskControllerPathDeletionContexts( + FileSystem fs, Path[] paths, Task task, boolean isWorkDir, + TaskController taskController) + throws IOException { + int i = 0; + PathDeletionContext[] contexts = + new TaskControllerPathDeletionContext[paths.length]; + + for (Path p : paths) { + contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task, + isWorkDir, taskController); + } + return contexts; + } + + /** * The task tracker is done with this job, so we need to clean up. * @param action The action with the job * @throws IOException @@ -1668,8 +1719,9 @@ public class TaskTracker */ void removeJobFiles(String user, String jobId) throws IOException { - directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, - getLocalJobDir(user, jobId))); + PathDeletionContext[] contexts = buildPathDeletionContexts(localFs, + getLocalFiles(fConf, getLocalJobDir(user, jobId))); + directoryCleanupThread.addToQueue(contexts); } /** @@ -2766,29 +2818,33 @@ public class TaskTracker runner.close(); } - String localTaskDir = - getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId - .toString(), task.isTaskCleanupTask()); if (localJobConf.getNumTasksToExecutePerJvm() == 1) { // No jvm reuse, remove everything - directoryCleanupThread.addToQueue(localFs, getLocalFiles( - defaultJobConf, localTaskDir)); + PathDeletionContext[] contexts = + buildTaskControllerPathDeletionContexts(localFs, + getLocalFiles(fConf, ""), task, false/* not workDir */, + taskController); + directoryCleanupThread.addToQueue(contexts); } else { // Jvm reuse. We don't delete the workdir since some other task // (running in the same JVM) might be using the dir. The JVM // running the tasks would clean the workdir per a task in the // task process itself. - directoryCleanupThread.addToQueue(localFs, getLocalFiles( - defaultJobConf, localTaskDir + Path.SEPARATOR - + TaskTracker.JOBFILE)); + String localTaskDir = + getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId + .toString(), task.isTaskCleanupTask()); + PathDeletionContext[] contexts = buildPathDeletionContexts( + localFs, getLocalFiles(defaultJobConf, localTaskDir + + Path.SEPARATOR + TaskTracker.JOBFILE)); + directoryCleanupThread.addToQueue(contexts); } } else { if (localJobConf.getNumTasksToExecutePerJvm() == 1) { - String taskWorkDir = - getTaskWorkDir(task.getUser(), task.getJobID().toString(), - taskId.toString(), task.isTaskCleanupTask()); - directoryCleanupThread.addToQueue(localFs, getLocalFiles( - defaultJobConf, taskWorkDir)); + PathDeletionContext[] contexts = + buildTaskControllerPathDeletionContexts(localFs, + getLocalFiles(fConf, ""), task, true /* workDir */, + taskController); + directoryCleanupThread.addToQueue(contexts); } } } @@ -3380,17 +3436,28 @@ public class TaskTracker // get the full paths of the directory in all the local disks. - private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{ + Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{ String[] localDirs = conf.getLocalDirs(); Path[] paths = new Path[localDirs.length]; FileSystem localFs = FileSystem.getLocal(conf); + boolean subdirNeeded = (subdir != null) && (subdir.length() > 0); for (int i = 0; i < localDirs.length; i++) { - paths[i] = new Path(localDirs[i], subdir); + paths[i] = (subdirNeeded) ? new Path(localDirs[i], subdir) + : new Path(localDirs[i]); paths[i] = paths[i].makeQualified(localFs); } return paths; } + FileSystem getLocalFileSystem(){ + return localFs; + } + + // only used by tests + void setLocalFileSystem(FileSystem fs){ + localFs = fs; + } + int getMaxCurrentMapTasks() { return maxMapSlots; } Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 03:43:43 2011 @@ -193,7 +193,7 @@ public class Localizer { synchronized (localizedUser) { if (localizedUser.get()) { - // User-directories are already localized for his user. + // User-directories are already localized for this user. LOG.info("User-directories for the user " + user + " are already initialized on this TT. Not doing anything."); return; Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 4 03:43:43 2011 @@ -257,7 +257,18 @@ public class MiniMRCluster { public int getNumTaskTrackers() { return taskTrackerList.size(); } - + + /** + * Sets inline cleanup threads to all task trackers sothat deletion of + * temporary files/dirs happen inline + */ + public void setInlineCleanupThreads() { + for (int i = 0; i < getNumTaskTrackers(); i++) { + getTaskTrackerRunner(i).getTaskTracker().setCleanupThread( + new UtilsForTests.InlineCleanupQueue()); + } + } + /** * Wait until the system is idle. */ Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java Fri Mar 4 03:43:43 2011 @@ -58,6 +58,9 @@ public class TestJobDirCleanup extends T namenode = fileSys.getUri().toString(); mr = new MiniMRCluster(10, namenode, 3, null, null, mrConf); + // make cleanup inline sothat validation of existence of these directories + // can be done + mr.setInlineCleanupThreads(); final String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); JobConf jobConf = mr.createJobConf(); runSleepJob(jobConf); @@ -66,13 +69,8 @@ public class TestJobDirCleanup extends T "/taskTracker/jobcache"; File jobDir = new File(jobDirStr); String[] contents = jobDir.list(); - while (contents.length > 0) { - try { - Thread.sleep(1000); - LOG.warn(jobDir +" not empty yet"); - contents = jobDir.list(); - } catch (InterruptedException ie){} - } + assertTrue("Contents of " + jobDir + " not cleanup.", + (contents == null || contents.length == 0)); } } catch (Exception ee){ } finally { Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 03:43:43 2011 @@ -44,10 +44,15 @@ public class TestLocalizationWithLinuxTa private static String taskTrackerSpecialGroup; @Override + protected boolean canRun() { + return ClusterWithLinuxTaskController.shouldRun(); + } + + @Override protected void setUp() throws Exception { - if (!ClusterWithLinuxTaskController.shouldRun()) { + if (!canRun()) { return; } @@ -65,7 +70,8 @@ public class TestLocalizationWithLinuxTa taskController.setConf(trackerFConf); taskController.setup(); - tracker.setLocalizer(new Localizer(tracker.localFs, localDirs, + tracker.setTaskController(taskController); + tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs, taskController)); // Rewrite conf so as to reflect task's correct user name. @@ -83,7 +89,7 @@ public class TestLocalizationWithLinuxTa @Override protected void tearDown() throws Exception { - if (!ClusterWithLinuxTaskController.shouldRun()) { + if (!canRun()) { return; } super.tearDown(); @@ -98,21 +104,6 @@ public class TestLocalizationWithLinuxTa // Do nothing. } - /** - * Test the localization of a user on the TT when {@link LinuxTaskController} - * is in use. - */ - @Override - public void testUserLocalization() - throws IOException { - - if (!ClusterWithLinuxTaskController.shouldRun()) { - return; - } - - super.testJobLocalization(); - } - @Override protected void checkUserLocalization() throws IOException { @@ -150,21 +141,6 @@ public class TestLocalizationWithLinuxTa } } - /** - * Test job localization with {@link LinuxTaskController}. Also check the - * permissions and file ownership of the job related files. - */ - @Override - public void testJobLocalization() - throws IOException { - - if (!ClusterWithLinuxTaskController.shouldRun()) { - return; - } - - super.testJobLocalization(); - } - @Override protected void checkJobLocalization() throws IOException { @@ -210,21 +186,6 @@ public class TestLocalizationWithLinuxTa } } - /** - * Test task localization with {@link LinuxTaskController}. Also check the - * permissions and file ownership of task related files. - */ - @Override - public void testTaskLocalization() - throws IOException { - - if (!ClusterWithLinuxTaskController.shouldRun()) { - return; - } - - super.testTaskLocalization(); - } - @Override protected void checkTaskLocalization() throws IOException { @@ -250,16 +211,4 @@ public class TestLocalizationWithLinuxTa .getUser(), taskTrackerSpecialGroup); } } - - /** - * Test cleanup of task files with {@link LinuxTaskController}. - */ - @Override - public void testTaskCleanup() - throws IOException { - if (!ClusterWithLinuxTaskController.shouldRun()) { - return; - } - super.testTaskCleanup(); - } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Mar 4 03:43:43 2011 @@ -60,6 +60,10 @@ public class TestMiniMRLocalFS extends T MiniMRCluster mr = null; try { mr = new MiniMRCluster(2, "file:///", 3); + // make cleanup inline sothat validation of existence of these directories + // can be done + mr.setInlineCleanupThreads(); + TestMiniMRWithDFS.runPI(mr, mr.createJobConf()); // run the wordcount example with caching Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 03:43:43 2011 @@ -322,6 +322,9 @@ public class TestMiniMRWithDFS extends T dfs = new MiniDFSCluster(conf, 4, true, null); fileSys = dfs.getFileSystem(); mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1); + // make cleanup inline sothat validation of existence of these directories + // can be done + mr.setInlineCleanupThreads(); runPI(mr, mr.createJobConf()); runWordCount(mr, mr.createJobConf()); Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1077127&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Fri Mar 4 03:43:43 2011 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +public class TestSetupWorkDir extends TestCase { + private static final Log LOG = + LogFactory.getLog(TestSetupWorkDir.class); + + /** + * Create a file in the given dir and set permissions r_xr_xr_x sothat no one + * can delete it directly(without doing chmod). + * Creates dir/subDir and dir/subDir/file + */ + static void createFileAndSetPermissions(JobConf jobConf, Path dir) + throws IOException { + Path subDir = new Path(dir, "subDir"); + FileSystem fs = FileSystem.getLocal(jobConf); + fs.mkdirs(subDir); + Path p = new Path(subDir, "file"); + DataOutputStream out = fs.create(p); + out.writeBytes("dummy input"); + out.close(); + // no write permission for subDir and subDir/file + try { + int ret = 0; + if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) { + LOG.warn("chmod failed for " + subDir + ";retVal=" + ret); + } + } catch(InterruptedException e) { + LOG.warn("Interrupted while doing chmod for " + subDir); + } + } + + /** + * Validates if setupWorkDir is properly cleaning up contents of workDir. + */ + public void testSetupWorkDir() throws IOException { + Path rootDir = new Path(System.getProperty("test.build.data", "/tmp"), + "testSetupWorkDir"); + Path myWorkDir = new Path(rootDir, "./work"); + JobConf jConf = new JobConf(); + FileSystem fs = FileSystem.getLocal(jConf); + if (fs.exists(myWorkDir)) { + fs.delete(myWorkDir, true); + } + if (!fs.mkdirs(myWorkDir)) { + throw new IOException("Unable to create workDir " + myWorkDir); + } + + // create {myWorkDir}/subDir/file and set 555 perms for subDir and file + createFileAndSetPermissions(jConf, myWorkDir); + + TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath())); + + assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.", + fs.listStatus(myWorkDir).length == 0); + + // cleanup + fs.delete(rootDir, true); + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:43:43 2011 @@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.JvmManag import org.apache.hadoop.mapred.TaskController.JobInitializationContext; import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; +import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue; import junit.framework.TestCase; @@ -73,36 +74,20 @@ public class TestTaskTrackerLocalization protected File[] attemptLogFiles; protected JobConf localizedTaskConf; - class InlineCleanupQueue extends CleanupQueue { - List stalePaths = new ArrayList(); - - public InlineCleanupQueue() { - // do nothing - } - - @Override - public void addToQueue(FileSystem fs, Path... paths) { - // delete in-line - for (Path p : paths) { - try { - LOG.info("Trying to delete the path " + p); - if (!fs.delete(p, true)) { - LOG.warn("Stale path " + p.toUri().getPath()); - stalePaths.add(p); - } - } catch (IOException e) { - LOG.warn("Caught exception while deleting path " - + p.toUri().getPath()); - LOG.info(StringUtils.stringifyException(e)); - stalePaths.add(p); - } - } - } + /** + * Dummy method in this base class. Only derived classes will define this + * method for checking if a test can be run. + */ + protected boolean canRun() { + return true; } @Override protected void setUp() throws Exception { + if (!canRun()) { + return; + } TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), getClass() .getSimpleName()); @@ -143,8 +128,9 @@ public class TestTaskTrackerLocalization tracker.setConf(trackerFConf); // for test case system FS is the local FS - tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf); - + tracker.systemFS = FileSystem.getLocal(trackerFConf); + tracker.setLocalFileSystem(tracker.systemFS); + taskTrackerUGI = UserGroupInformation.login(trackerFConf); // Set up the task to be localized @@ -159,8 +145,10 @@ public class TestTaskTrackerLocalization taskController = new DefaultTaskController(); taskController.setConf(trackerFConf); taskController.setup(); - tracker.setLocalizer(new Localizer(tracker.localFs, localDirs, - taskController)); + + tracker.setTaskController(taskController); + tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs, + taskController)); } /** @@ -204,6 +192,9 @@ public class TestTaskTrackerLocalization @Override protected void tearDown() throws Exception { + if (!canRun()) { + return; + } FileUtil.fullyDelete(TEST_ROOT_DIR); } @@ -235,6 +226,9 @@ public class TestTaskTrackerLocalization */ public void testTaskControllerSetup() throws IOException { + if (!canRun()) { + return; + } // Task-controller is already set up in the test's setup method. Now verify. for (String localDir : localDirs) { @@ -258,7 +252,9 @@ public class TestTaskTrackerLocalization */ public void testUserLocalization() throws IOException { - + if (!canRun()) { + return; + } // /////////// The main method being tested tracker.getLocalizer().initializeUserDirs(task.getUser()); // /////////// @@ -329,7 +325,9 @@ public class TestTaskTrackerLocalization */ public void testJobLocalization() throws IOException { - + if (!canRun()) { + return; + } tracker.getLocalizer().initializeUserDirs(task.getUser()); // /////////// The main method being tested @@ -423,7 +421,9 @@ public class TestTaskTrackerLocalization */ public void testTaskLocalization() throws IOException { - + if (!canRun()) { + return; + } tracker.getLocalizer().initializeUserDirs(task.getUser()); localizedJobConf = tracker.localizeJobFiles(task); @@ -539,14 +539,102 @@ public class TestTaskTrackerLocalization } /** + * Validates the removal of $taskid and $tasid/work under mapred-local-dir + * in cases where those directories cannot be deleted without adding + * write permission to the newly created directories under $taskid and + * $taskid/work + * Also see TestSetupWorkDir.createFileAndSetPermissions for details + */ + void validateRemoveFiles(boolean needCleanup, boolean jvmReuse, + TaskInProgress tip) throws IOException { + // create files and set permissions 555. Verify if task controller sets + // the permissions for TT to delete the taskDir or workDir + String dir = (!needCleanup || jvmReuse) ? + TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()) + : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()); + + Path[] paths = tracker.getLocalFiles(localizedJobConf, dir); + for (Path p : paths) { + if (tracker.getLocalFileSystem().exists(p)) { + TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p); + } + } + + InlineCleanupQueue cleanupQueue = new InlineCleanupQueue(); + tracker.setCleanupThread(cleanupQueue); + + tip.removeTaskFiles(needCleanup, taskId); + + if (jvmReuse) { + // work dir should still exist and cleanup queue should be empty + assertTrue("cleanup queue is not empty after removeTaskFiles() in case " + + "of jvm reuse.", cleanupQueue.isQueueEmpty()); + boolean workDirExists = false; + for (Path p : paths) { + if (tracker.getLocalFileSystem().exists(p)) { + workDirExists = true; + } + } + assertTrue("work dir does not exist in case of jvm reuse", workDirExists); + + // now try to delete the work dir and verify that there are no stale paths + JvmManager.deleteWorkDir(tracker, task); + } + tracker.removeJobFiles(task.getUser(), jobId.toString()); + + assertTrue("Some task files are not deleted!! Number of stale paths is " + + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); + } + + /** + * Validates if task cleanup is done properly for a succeeded task * @throws IOException */ public void testTaskCleanup() throws IOException { + if (!canRun()) { + return; + } + testTaskCleanup(false, false);// no needCleanup; no jvmReuse + } + /** + * Validates if task cleanup is done properly for a task that is not succeeded + * @throws IOException + */ + public void testFailedTaskCleanup() + throws IOException { + if (!canRun()) { + return; + } + testTaskCleanup(true, false);// needCleanup; no jvmReuse + } + + /** + * Validates if task cleanup is done properly for a succeeded task + * @throws IOException + */ + public void testTaskCleanupWithJvmUse() + throws IOException { + if (!canRun()) { + return; + } + testTaskCleanup(false, true);// no needCleanup; jvmReuse + } + + /** + * Validates if task cleanup is done properly + */ + private void testTaskCleanup(boolean needCleanup, boolean jvmReuse) + throws IOException { // Localize job and localize task. tracker.getLocalizer().initializeUserDirs(task.getUser()); localizedJobConf = tracker.localizeJobFiles(task); + if (jvmReuse) { + localizedJobConf.setNumTasksToExecutePerJvm(2); + } // Now initialize the job via task-controller so as to set // ownership/permissions of jars, job-work-dir JobInitializationContext jobContext = new JobInitializationContext(); @@ -585,18 +673,9 @@ public class TestTaskTrackerLocalization // TODO: Let the task run and create files. - InlineCleanupQueue cleanupQueue = new InlineCleanupQueue(); - tracker.directoryCleanupThread = cleanupQueue; - - // ////////// The central methods being tested - tip.removeTaskFiles(true, taskId); - tracker.removeJobFiles(task.getUser(), jobId.toString()); - // ////////// - - // TODO: make sure that all files intended to be deleted are deleted. - - assertTrue("Some task files are not deleted!! Number of stale paths is " - + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); + // create files and set permissions 555. Verify if task controller sets + // the permissions for TT to delete the task dir or work dir properly + validateRemoveFiles(needCleanup, jvmReuse, tip); // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still // there. @@ -604,7 +683,7 @@ public class TestTaskTrackerLocalization Path userDir = new Path(localDir, TaskTracker.getUserDir(task.getUser())); assertTrue("User directory " + userDir + " is not present!!", - tracker.localFs.exists(userDir)); + tracker.getLocalFileSystem().exists(userDir)); } // Test userlogs cleanup. @@ -624,7 +703,7 @@ public class TestTaskTrackerLocalization // Logs should be there before cleanup. assertTrue("Userlogs dir " + logDir + " is not presen as expected!!", - tracker.localFs.exists(logDir)); + tracker.getLocalFileSystem().exists(logDir)); // ////////// Another being tested TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file @@ -633,6 +712,6 @@ public class TestTaskTrackerLocalization // Logs should be gone after cleanup. assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!", - tracker.localFs.exists(logDir)); + tracker.getLocalFileSystem().exists(logDir)); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 03:43:43 2011 @@ -19,9 +19,11 @@ package org.apache.hadoop.mapred; import java.text.DecimalFormat; +import java.util.ArrayList; import java.io.*; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Enumeration; import java.util.Properties; @@ -46,6 +48,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.StringUtils; /** * Utilities used in unit test. @@ -426,7 +429,37 @@ public class UtilsForTests { } } } - + + /** + * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread + * asynchronously. + */ + public static class InlineCleanupQueue extends CleanupQueue { + List stalePaths = new ArrayList(); + + public InlineCleanupQueue() { + // do nothing + } + + @Override + public void addToQueue(PathDeletionContext... contexts) { + // delete paths in-line + for (PathDeletionContext context : contexts) { + try { + if (!deletePath(context)) { + LOG.warn("Stale path " + context.fullPath); + stalePaths.add(context.fullPath); + } + } catch (IOException e) { + LOG.warn("Caught exception while deleting path " + + context.fullPath); + LOG.info(StringUtils.stringifyException(e)); + stalePaths.add(context.fullPath); + } + } + } + } + static String getTaskSignalParameter(boolean isMap) { return isMap ? "test.mapred.map.waiting.target" Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java?rev=1077127&r1=1077126&r2=1077127&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Fri Mar 4 03:43:43 2011 @@ -58,7 +58,10 @@ public class TestServiceLevelAuthorizati JobConf mrConf = new JobConf(conf); mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, null, null, mrConf); - + // make cleanup inline sothat validation of existence of these directories + // can be done + mr.setInlineCleanupThreads(); + // Run examples TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf)); TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf));