Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75CD416FD for ; Fri, 22 Apr 2011 23:13:53 +0000 (UTC) Received: (qmail 26961 invoked by uid 500); 22 Apr 2011 23:13:53 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 26930 invoked by uid 500); 22 Apr 2011 23:13:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 26923 invoked by uid 99); 22 Apr 2011 23:13:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Apr 2011 23:13:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Apr 2011 23:13:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1661A23888FD; Fri, 22 Apr 2011 23:13:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1096081 - in /hadoop/common/branches/branch-0.20-security: ./ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Fri, 22 Apr 2011 23:13:23 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110422231324.1661A23888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Apr 22 23:13:23 2011 New Revision: 1096081 URL: http://svn.apache.org/viewvc?rev=1096081&view=rev Log: MAPREDUCE-2415. Distribute the user task logs on to multiple disks. (Bharath Mundlapudi via omalley) Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Apr 22 23:13:23 2011 @@ -61,15 +61,18 @@ Release 0.20.204.0 - unreleased IMPROVEMENTS + MAPREDUCE-2415. Distribute the user task logs on to multiple disks. + (Bharath Mundlapudi via omalley) + + MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing + itself. (Ravi Gummadi and Jagane Sundar via omalley) + HDFS-1541. Not marking datanodes dead when namenode in safemode. (hairong) HDFS-1767. Namenode ignores non-initial block report from datanodes when in safemode during startup. (Matt Foley via suresh) - MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing - itself. (Ravi Gummadi and Jagane Sundar via omalley) - Release 0.20.203.0 - unreleased HADOOP-7190. Add metrics v1 back for backwards compatibility. (omalley) Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/main.c?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c (original) +++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c Fri Apr 22 23:13:23 2011 @@ -192,7 +192,7 @@ int main(int argc, char **argv) { break; case DELETE_LOG_AS_USER: dir_to_be_deleted = argv[optind++]; - exit_code= delete_log_directory(dir_to_be_deleted); + exit_code= delete_log_directory(dir_to_be_deleted, good_local_dirs); break; case RUN_COMMAND_AS_USER: exit_code = run_command_as_user(user_detail->pw_name, argv + optind); Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.c?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c (original) +++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c Fri Apr 22 23:13:23 2011 @@ -28,9 +28,11 @@ #include #include #include +#include #include #include #include +#include #define USER_DIR_PATTERN "%s/taskTracker/%s" @@ -54,6 +56,8 @@ static const int DEFAULT_MIN_USERID = 10 #define BANNED_USERS_KEY "banned.users" +#define USERLOGS "userlogs" + static const char* DEFAULT_BANNED_USERS[] = {"mapred", "hdfs", "bin", 0}; //struct to store the user details @@ -355,11 +359,51 @@ int mkdirs(const char* path, mode_t perm return 0; } +static short get_current_local_dir_count(char **local_dir) +{ + char **local_dir_ptr; + short count=0; + + for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) { + ++count; + } + return count; +} + +static char* get_nth_local_dir(char **local_dir, int nth) +{ + char **local_dir_ptr; + short count=0; + + for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) { + if(count == nth) { + return strdup(*local_dir_ptr); + } + ++count; + } + fprintf(LOGFILE, "Invalid index %d for %d local directories\n", nth, count); + return NULL; +} + +static char* get_random_local_dir(char **local_dir) { + struct timeval tv; + short nth; + gettimeofday(&tv, NULL); + srand ( (long) tv.tv_sec*1000000 + tv.tv_usec ); + short cnt = get_current_local_dir_count(local_dir); + if(cnt == 0) { + fprintf(LOGFILE, "No valid local directories\n"); + return NULL; + } + nth = rand() % cnt; + return get_nth_local_dir(local_dir, nth); +} + /** * Function to prepare the attempt directories for the task JVM. * It creates the task work and log directories. */ -static int create_attempt_directories(const char* user, +int create_attempt_directories(const char* user, const char * good_local_dirs, const char *job_id, const char *task_id) { // create dirs as 0750 const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP; @@ -392,24 +436,67 @@ static int create_attempt_directories(co free(task_dir); } } - free_values(local_dir); // also make the directory for the task logs char *job_task_name = malloc(strlen(job_id) + strlen(task_id) + 2); + char *real_task_dir = NULL; // target of symlink + char *real_job_dir = NULL; // parent dir of target of symlink + char *random_local_dir = NULL; + char *link_task_log_dir = NULL; // symlink if (job_task_name == NULL) { fprintf(LOGFILE, "Malloc of job task name failed\n"); result = -1; } else { sprintf(job_task_name, "%s/%s", job_id, task_id); - char *log_dir = get_job_log_directory(job_task_name); - free(job_task_name); - if (log_dir == NULL) { + link_task_log_dir = get_job_log_directory(job_task_name); + random_local_dir = get_random_local_dir(local_dir); + if(random_local_dir == NULL) { + result = -1; + goto cleanup; + } + real_job_dir = malloc(strlen(random_local_dir) + strlen(USERLOGS) + + strlen(job_id) + 3); + if (real_job_dir == NULL) { + fprintf(LOGFILE, "Malloc of real job directory failed\n"); result = -1; - } else if (mkdirs(log_dir, perms) != 0) { + goto cleanup; + } + real_task_dir = malloc(strlen(random_local_dir) + strlen(USERLOGS) + + strlen(job_id) + strlen(task_id) + 4); + if (real_task_dir == NULL) { + fprintf(LOGFILE, "Malloc of real task directory failed\n"); + result = -1; + goto cleanup; + } + sprintf(real_job_dir, "%s/userlogs/%s", random_local_dir, job_id); + result = create_directory_for_user(real_job_dir); + if( result != 0) { + result = -1; + goto cleanup; + } + sprintf(real_task_dir, "%s/userlogs/%s/%s", + random_local_dir, job_id, task_id); + result = mkdirs(real_task_dir, perms); + if( result != 0) { + result = -1; + goto cleanup; + } + result = symlink(real_task_dir, link_task_log_dir); + if( result != 0) { + fprintf(LOGFILE, "Failed to create symlink %s to %s - %s\n", + link_task_log_dir, real_task_dir, strerror(errno)); result = -1; } - free(log_dir); } + + cleanup: + free(random_local_dir); + free(job_task_name); + free(link_task_log_dir); + free(real_job_dir); + free(real_task_dir); + free_values(local_dir); + return result; } @@ -523,7 +610,7 @@ static int change_owner(const char* path /** * Create a top level directory for the user. * It assumes that the parent directory is *not* writable by the user. - * It creates directories with 02700 permissions owned by the user + * It creates directories with 02750 permissions owned by the user * and with the group set to the task tracker group. * return non-0 on failure */ @@ -1036,17 +1123,38 @@ int delete_as_user(const char *user, con return ret; } -/** - * delete a given log directory +/* + * delete a given job log directory + * This function takes jobid and deletes the related logs. */ -int delete_log_directory(const char *subdir) { - char* log_subdir = get_job_log_directory(subdir); +int delete_log_directory(const char *subdir, const char * good_local_dirs) { + char* job_log_dir = get_job_log_directory(subdir); + int ret = -1; - if (log_subdir != NULL) { - ret = delete_path(log_subdir, strchr(subdir, '/') == NULL); + if (job_log_dir == NULL) return ret; + + //delete the job log directory in /userlogs/jobid + delete_path(job_log_dir, true); + + char **local_dir = get_mapred_local_dirs(good_local_dirs); + + char **local_dir_ptr; + for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) { + char *mapred_local_log_dir = concatenate("%s/userlogs/%s", + "mapred local job log dir", + 2, *local_dir_ptr, subdir); + if (mapred_local_log_dir != NULL) { + //delete the job log directory in /userlogs/jobid + delete_path(mapred_local_log_dir, true); + free(mapred_local_log_dir); + } + else + fprintf(LOGFILE, "Failed to delete mapred local log dir for jobid %s\n", + subdir); } - free(log_subdir); - return ret; + free(job_log_dir); + free_values(local_dir); + return 0; } /** Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.h?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h (original) +++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h Fri Apr 22 23:13:23 2011 @@ -71,7 +71,7 @@ int check_taskcontroller_permissions(cha /** * delete a given log directory as a user */ -int delete_log_directory(const char *log_dir); +int delete_log_directory(const char *log_dir, const char * good_local_dirs); // initialize the job directory int initialize_job(const char *user, const char * good_local_dirs, const char *jobid, @@ -153,3 +153,9 @@ int initialize_user(const char *user, co int create_directory_for_user(const char* path); int change_user(uid_t user, gid_t group); + +/** + * Create task attempt related directories as user. + */ +int create_attempt_directories(const char* user, + const char * good_local_dirs, const char *job_id, const char *task_id); Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c (original) +++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c Fri Apr 22 23:13:23 2011 @@ -24,9 +24,11 @@ #include #include #include +#include #include #include #include +#include #define TEST_ROOT "/tmp/test-task-controller" #define DONT_TOUCH_FILE "dont-touch-me" @@ -185,6 +187,78 @@ void test_get_task_log_dir() { free(logdir); } +void create_userlogs_dir() { + char** tt_roots = get_values("mapred.local.dir"); + char** tt_root; + for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) { + char buffer[100000]; + sprintf(buffer, "%s/userlogs", *tt_root); + if (mkdir(buffer, 0755) != 0) { + printf("FAIL: Can't create directory %s - %s\n", buffer, + strerror(errno)); + exit(1); + } + } + free_values(tt_roots); +} + +void test_create_log_directory() { + printf("\nTesting test_create_log_directory\n"); + create_userlogs_dir(); + char *job_log_dir = get_job_log_directory("job_7"); + if (job_log_dir == NULL) { + exit(1); + } + if (create_directory_for_user(job_log_dir) != 0) { + exit(1); + } + free(job_log_dir); + char* good_local_dirs = get_value("mapred.local.dir"); + if (good_local_dirs == NULL) { + fprintf(LOGFILE, "Mapred local directories could not be obtained.\n"); + exit(1); + } + create_attempt_directories(username, good_local_dirs, "job_7", "task_1"); + + //check if symlink got created + struct stat file; + int status; + char actualpath [PATH_MAX+1]; + char *res; + char *filepath = TEST_ROOT "/logs/userlogs/job_7/task_1"; + + status = lstat(filepath, &file); + if (!S_ISLNK(file.st_mode)) { + fprintf(LOGFILE, "Symlink creation failed\n"); + exit(1); + } + + //Check if symlink path exists + res = realpath(filepath, actualpath); + if(!res) { + fprintf(LOGFILE, "Failed to get target for the symlink\n"); + exit(1); + } + + char local_job_dir[PATH_MAX+1]; + int i; + bool found = false; + for(i=1; i<5; i++) { + sprintf(local_job_dir, TEST_ROOT "/local-%d/userlogs/job_7/task_1", i); + if (strcmp(local_job_dir, actualpath) == 0) { + found = true; + break; + } + } + + if(!found) { + printf("FAIL: symlink path and target path mismatch\n"); + exit(1); + } + + free(good_local_dirs); +} + void test_check_user() { printf("\nTesting test_check_user\n"); struct passwd *user = check_user(username); @@ -220,8 +294,9 @@ void test_check_configuration_permission } void test_delete_task() { - if (initialize_user(username)) { - printf("FAIL: failed to initialized user %s\n", username); + char* local_dirs = get_value("mapred.local.dir"); + if (initialize_user(username, local_dirs)) { + printf("FAIL: failed to initialize user %s\n", username); exit(1); } char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1"); @@ -254,7 +329,7 @@ void test_delete_task() { run(buffer); // delete task directory - int ret = delete_as_user(username, "jobcache/job_1/task_1"); + int ret = delete_as_user(username, local_dirs, "jobcache/job_1/task_1"); if (ret != 0) { printf("FAIL: return code from delete_as_user is %d\n", ret); exit(1); @@ -282,9 +357,11 @@ void test_delete_task() { free(job_dir); free(task_dir); free(dont_touch); + free(local_dirs); } void test_delete_job() { + char* local_dirs = get_value("mapred.local.dir"); char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2"); char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, DONT_TOUCH_FILE); @@ -315,7 +392,7 @@ void test_delete_job() { run(buffer); // delete task directory - int ret = delete_as_user(username, "jobcache/job_2"); + int ret = delete_as_user(username, local_dirs, "jobcache/job_2"); if (ret != 0) { printf("FAIL: return code from delete_as_user is %d\n", ret); exit(1); @@ -339,11 +416,13 @@ void test_delete_job() { free(job_dir); free(task_dir); free(dont_touch); + free(local_dirs); } void test_delete_user() { printf("\nTesting delete_user\n"); + char* local_dirs = get_value("mapred.local.dir"); char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3"); if (mkdirs(job_dir, 0700) != 0) { exit(1); @@ -354,7 +433,7 @@ void test_delete_user() { printf("FAIL: directory missing before test\n"); exit(1); } - if (delete_as_user(username, "") != 0) { + if (delete_as_user(username, local_dirs, "") != 0) { exit(1); } if (access(buffer, R_OK) == 0) { @@ -366,10 +445,12 @@ void test_delete_user() { exit(1); } free(job_dir); + free(local_dirs); } void test_delete_log_directory() { printf("\nTesting delete_log_directory\n"); + char* local_dirs = get_value("mapred.local.dir"); char *job_log_dir = get_job_log_directory("job_1"); if (job_log_dir == NULL) { exit(1); @@ -389,7 +470,7 @@ void test_delete_log_directory() { printf("FAIL: can't access task directory - %s\n", strerror(errno)); exit(1); } - if (delete_log_directory("job_1/task_2") != 0) { + if (delete_log_directory("job_1/task_2", local_dirs) != 0) { printf("FAIL: can't delete task directory\n"); exit(1); } @@ -401,7 +482,7 @@ void test_delete_log_directory() { printf("FAIL: job directory not deleted - %s\n", strerror(errno)); exit(1); } - if (delete_log_directory("job_1") != 0) { + if (delete_log_directory("job_1", local_dirs) != 0) { printf("FAIL: can't delete task directory\n"); exit(1); } @@ -409,7 +490,25 @@ void test_delete_log_directory() { printf("FAIL: job directory not deleted\n"); exit(1); } + if (delete_log_directory("job_7", local_dirs) != 0) { + printf("FAIL: can't delete job directory\n"); + exit(1); + } + if (access(TEST_ROOT "/logs/userlogs/job_7", R_OK) == 0) { + printf("FAIL: job log directory not deleted\n"); + exit(1); + } + char local_job_dir[PATH_MAX+1]; + int i; + for(i=1; i<5; i++) { + sprintf(local_job_dir, TEST_ROOT "/local-%d/userlogs/job_7", i); + if (access(local_job_dir, R_OK) == 0) { + printf("FAIL: job log directory in mapred local not deleted\n"); + exit(1); + } + } free(task_log_dir); + free(local_dirs); } void run_test_in_child(const char* test_name, void (*func)()) { @@ -558,7 +657,8 @@ void test_init_job() { exit(1); } else if (child == 0) { char *final_pgm[] = {"touch", "my-touch-file", 0}; - if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", + char* local_dirs = get_value("mapred.local.dir"); + if (initialize_job(username, local_dirs, "job_4", TEST_ROOT "/creds.txt", TEST_ROOT "/job.xml", final_pgm) != 0) { printf("FAIL: failed in child\n"); exit(42); @@ -631,13 +731,14 @@ void test_run_task() { fflush(stderr); char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1", username, "job_4", "task_1"); + char* local_dirs = get_value("mapred.local.dir"); pid_t child = fork(); if (child == -1) { printf("FAIL: failed to fork process for init_job - %s\n", strerror(errno)); exit(1); } else if (child == 0) { - if (run_task_as_user(username, "", "job_4", "task_1", + if (run_task_as_user(username, local_dirs, "job_4", "task_1", task_dir, script_name) != 0) { printf("FAIL: failed in child\n"); exit(42); @@ -736,6 +837,8 @@ int main(int argc, char **argv) { test_check_user(); + test_create_log_directory(); + test_delete_log_directory(); // the tests that change user need to be run in a subshell, so that Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Apr 22 23:13:23 2011 @@ -65,6 +65,17 @@ public class DefaultTaskController exten } } + @Override + public void createLogDir(TaskAttemptID taskID, + boolean isCleanup) throws IOException { + boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup, + localStorage.getGoodLocalDirs()); + if (!b) { + LOG.warn("Creation of attempt log dir for " + taskID + + " failed. Ignoring"); + } + } + /** * Create all of the directories for the task and launches the child jvm. * @param user the user name @@ -80,9 +91,8 @@ public class DefaultTaskController exten File currentWorkDirectory, String stdout, String stderr) throws IOException { - ShellCommandExecutor shExec = null; - try { + try { FileSystem localFs = FileSystem.getLocal(getConf()); //create the attempt dirs @@ -232,7 +242,24 @@ public class DefaultTaskController exten public void deleteLogAsUser(String user, String subDir) throws IOException { Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir); - fs.delete(dir, true); + //Delete the subDir in /userlogs + File subDirPath = new File(dir.toString()); + FileUtil.fullyDelete( subDirPath ); + + //Delete the subDir in all good /userlogs + String [] localDirs = localStorage.getGoodLocalDirs(); + for(String localdir : localDirs) { + String dirPath = localdir + File.separatorChar + + TaskLog.USERLOGS_DIR_NAME + File.separatorChar + + subDir; + try { + FileUtil.fullyDelete( new File(dirPath) ); + } catch(Exception e){ + //Skip bad dir for later deletion + LOG.warn("Could not delete dir: " + dirPath + + " , Reason : " + e.getMessage()); + } + } } @Override Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Apr 22 23:13:23 2011 @@ -71,6 +71,10 @@ public abstract class TaskController imp return conf; } + public LocalStorage getLocalStorage() { + return localStorage; + } + public void setConf(Configuration conf) { this.conf = conf; } @@ -143,6 +147,17 @@ public abstract class TaskController imp */ public abstract void deleteAsUser(String user, String subDir) throws IOException; + + /** + * Creates task log dir + * @param taskID ID of the task + * @param isCleanup If the task is cleanup task or not + * @throws IOException + */ + public void createLogDir(TaskAttemptID taskID, + boolean isCleanup) throws IOException { + + } /** * Delete the user's files under the userlogs directory. Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Apr 22 23:13:23 2011 @@ -33,6 +33,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,8 +44,10 @@ import org.apache.hadoop.fs.LocalFileSys import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -75,7 +78,103 @@ public class TaskLog { LOG_DIR.mkdirs(); } } + + static AtomicInteger rotor = new AtomicInteger(0); + + /** + * Create log directory for the given attempt. This involves creating the + * following and setting proper permissions for the new directories + *
{hadoop.log.dir}/userlogs/ + *
{hadoop.log.dir}/userlogs// + *
{one of the mapred-local-dirs}/userlogs/ + *
{one of the mapred-local-dirs}/userlogs// + * + * @param taskID attempt-id for which log dir is to be created + * @param isCleanup Is this attempt a cleanup attempt ? + * @param localDirs mapred local directories + * @return true if attempt log directory creation is succeeded + * @throws IOException + */ + public static boolean createTaskAttemptLogDir(TaskAttemptID taskID, + boolean isCleanup, String[] localDirs) throws IOException{ + String cleanupSuffix = isCleanup ? ".cleanup" : ""; + String strAttemptLogDir = getTaskAttemptLogDir(taskID, + cleanupSuffix, localDirs); + File attemptLogDir = new File(strAttemptLogDir); + boolean isSucceeded = attemptLogDir.mkdirs(); + if(isSucceeded) { + String strLinkAttemptLogDir = getJobDir( + taskID.getJobID()).getAbsolutePath() + File.separatorChar + + taskID.toString() + cleanupSuffix; + if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) { + LOG.warn("Creation of symlink to attempt log dir failed."); + isSucceeded = false; + } + + File linkAttemptLogDir = new File(strLinkAttemptLogDir); + // Set permissions for job dir in userlogs + if (!Localizer.PermissionsHandler.setPermissions( + linkAttemptLogDir.getParentFile(), + Localizer.PermissionsHandler.sevenZeroZero)) { + LOG.warn("Setting permissions to " + + linkAttemptLogDir.getParentFile() + " failed."); + isSucceeded = false; + } + //Set permissions for target attempt log dir + if (!Localizer.PermissionsHandler.setPermissions(attemptLogDir, + Localizer.PermissionsHandler.sevenZeroZero)) { + LOG.warn("Setting permissions to the real attempt log dir " + + attemptLogDir + " failed."); + isSucceeded = false; + } + //Set permissions for target job log dir + if (!Localizer.PermissionsHandler.setPermissions( + attemptLogDir.getParentFile(), + Localizer.PermissionsHandler.sevenZeroZero)) { + LOG.warn("Setting permissions to the real job log dir " + + attemptLogDir.getParentFile() + " failed."); + isSucceeded = false; + } + } + return isSucceeded; + } + + /** + * Get one of the mapred local directory in a round-robin-way. + * @param localDirs mapred local directories + * @return the next chosen mapred local directory + * @throws IOException + */ + private static String getNextLocalDir(String[] localDirs) throws IOException{ + if(localDirs.length == 0) { + throw new IOException ("Not enough mapred.local.dirs (" + + localDirs.length + ")"); + } + return localDirs[Math.abs(rotor.getAndIncrement()) % localDirs.length]; + } + /** + * Get attempt log directory path for the given attempt-id under randomly + * selected mapred local directory. + * @param taskID attempt-id for which log dir path is needed + * @param cleanupSuffix ".cleanup" if this attempt is a cleanup attempt + * @param localDirs mapred local directories + * @return target task attempt log directory + * @throws IOException + */ + public static String getTaskAttemptLogDir(TaskAttemptID taskID, + String cleanupSuffix, String[] localDirs) throws IOException { + StringBuilder taskLogDirLocation = new StringBuilder(); + taskLogDirLocation.append(getNextLocalDir(localDirs)); + taskLogDirLocation.append(File.separatorChar); + taskLogDirLocation.append(USERLOGS_DIR_NAME); + taskLogDirLocation.append(File.separatorChar); + taskLogDirLocation.append(taskID.getJobID().toString()); + taskLogDirLocation.append(File.separatorChar); + taskLogDirLocation.append(taskID.toString()+cleanupSuffix); + return taskLogDirLocation.toString(); + } + public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup, LogName filter) { return new File(getAttemptDir(taskid, isCleanup), filter.toString()); Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Apr 22 23:13:23 2011 @@ -313,15 +313,7 @@ abstract class TaskRunner extends Thread TaskLog.LogName.STDOUT); logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup, TaskLog.LogName.STDERR); - File logDir = logFiles[0].getParentFile(); - boolean b = logDir.mkdirs(); - if (!b) { - LOG.warn("mkdirs failed. Ignoring"); - } else { - Localizer.PermissionsHandler.setPermissions(logDir, - Localizer.PermissionsHandler.sevenZeroZero); - } - + getTracker().getTaskController().createLogDir(taskid, isCleanup); return logFiles; } Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Apr 22 23:13:23 2011 @@ -745,7 +745,13 @@ public class TaskTracker implements MRCo for (String s : localStorage.getGoodLocalDirs()) { localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub); } - + // Create userlogs directory under all good mapred-local-dirs + for (String s : localStorage.getGoodLocalDirs()) { + Path userLogsDir = new Path(s, TaskLog.USERLOGS_DIR_NAME); + if (!localFs.exists(userLogsDir)) { + localFs.mkdirs(userLogsDir, pub); + } + } // Clear out state tables this.tasks.clear(); this.runningTasks = new LinkedHashMap(); @@ -905,7 +911,9 @@ public class TaskTracker implements MRCo * startup, to remove any leftovers from previous run. */ public void cleanupStorage() throws IOException { - this.fConf.deleteLocalFiles(); + this.fConf.deleteLocalFiles(SUBDIR); + this.fConf.deleteLocalFiles(TT_PRIVATE_DIR); + this.fConf.deleteLocalFiles(TT_LOG_TMP_DIR); } // Object on wait which MapEventsFetcherThread is going to wait. @@ -1396,6 +1404,14 @@ public class TaskTracker implements MRCo fConf = conf; } + void setLocalStorage(LocalStorage in) { + localStorage = in; + } + + void setLocalDirAllocator(LocalDirAllocator in) { + localDirAllocator = in; + } + /** * Start with the local machine name, and the default JobTracker */ Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Apr 22 23:13:23 2011 @@ -123,36 +123,52 @@ public class UserLogCleaner extends Thre } /** - * Clears all the logs in userlog directory. + * Adds the job log directories for deletion with default retain hours. + * Deletes all other directories, if any. * - * Adds the job directories for deletion with default retain hours. Deletes - * all other directories, if any. This is usually called on reinit/restart of - * the TaskTracker + * @param loc location of log directory + * @param conf + * @throws IOException + */ + public void addOldUserLogsForDeletion(File loc, Configuration conf) + throws IOException { + if (loc.exists()) { + long now = clock.getTime(); + for(String logDir: loc.list()) { + // add all the log dirs to taskLogsMnonitor. + JobID jobid = null; + try { + jobid = JobID.forName(logDir); + } catch (IllegalArgumentException ie) { + deleteLogPath(logDir); + continue; + } + // add the job log directory for deletion with + // default retain hours, if it is not already added + if (!completedJobs.containsKey(jobid)) { + JobCompletedEvent jce = + new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf)); + userLogManager.addLogEvent(jce); + } + } + } + } + + /** + * Clears all the logs in userlogs directory. This is usually called on + * reinit/restart of the TaskTracker. * * @param conf * @throws IOException */ public void clearOldUserLogs(Configuration conf) throws IOException { File userLogDir = TaskLog.getUserLogDir(); - if (userLogDir.exists()) { - long now = clock.getTime(); - for(String logDir: userLogDir.list()) { - // add all the log dirs to taskLogsMnonitor. - JobID jobid = null; - try { - jobid = JobID.forName(logDir); - } catch (IllegalArgumentException ie) { - deleteLogPath(logDir); - continue; - } - // add the job log directory for deletion with default retain hours, - // if it is not already added - if (!completedJobs.containsKey(jobid)) { - JobCompletedEvent jce = - new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf)); - userLogManager.addLogEvent(jce); - } - } + addOldUserLogsForDeletion(userLogDir, conf); + String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY); + for(String localDir : localDirs) { + File mapredLocalUserLogDir = new File(localDir + + File.separatorChar + TaskLog.USERLOGS_DIR_NAME); + addOldUserLogsForDeletion(mapredLocalUserLogDir, conf); } } @@ -196,6 +212,48 @@ public class UserLogCleaner extends Thre } /** + * Gets the user for the log path. + * + * @param logPath + * @throws IOException + */ + private String getLogUser(String logPath) throws IOException{ + //Get user from /userlogs/jobid path + String logRoot = TaskLog.getUserLogDir().toString(); + String user = null; + try{ + user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner(); + }catch(Exception e){ + //Ignore this exception since this path might have been deleted. + } + + //If we found the user for this logPath, then return this user + if(user != null) return user; + + //If /userlogs/jobid not found, then get user from + //any one of existing /userlogs/jobid path(s) + String[] localDirs = + userLogManager.getTaskController().getLocalStorage().getGoodLocalDirs(); + for(String localDir : localDirs) { + try{ + logRoot = localDir + File.separator + TaskLog.USERLOGS_DIR_NAME; + user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner(); + //If we found the user for this logPath, then break this loop + if(user != null) break; + + }catch(Exception e){ + //Ignore this exception since this path might have been deleted. + } + } + + if(user == null) { + throw new IOException("Userlog path not found for " + logPath); + } + + return user; + } + + /** * Deletes the log path. * * This path will be removed through {@link CleanupQueue} @@ -205,8 +263,7 @@ public class UserLogCleaner extends Thre */ private void deleteLogPath(String logPath) throws IOException { LOG.info("Deleting user log path " + logPath); - String logRoot = TaskLog.getUserLogDir().toString(); - String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner(); + String user = getLogUser(logPath); TaskController controller = userLogManager.getTaskController(); PathDeletionContext item = new TaskController.DeletionContext(controller, true, user, logPath); Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1096081&r1=1096080&r2=1096081&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (original) +++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Apr 22 23:13:23 2011 @@ -23,11 +23,14 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.mapred.TaskTracker.LocalStorage; import org.apache.hadoop.mapred.UtilsForTests.FakeClock; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import static org.junit.Assert.*; @@ -48,15 +51,18 @@ public class TestUserLogCleanup { private JobID jobid4 = new JobID(jtid, 4); private File foo = new File(TaskLog.getUserLogDir(), "foo"); private File bar = new File(TaskLog.getUserLogDir(), "bar"); + private static String TEST_ROOT_DIR = + System.getProperty("test.build.data", "/tmp"); - public TestUserLogCleanup() throws IOException { - Configuration conf = new Configuration(); + public TestUserLogCleanup() throws IOException, InterruptedException { + JobConf conf= new JobConf(); startTT(conf); } @After public void tearDown() throws IOException { FileUtil.fullyDelete(TaskLog.getUserLogDir()); + FileUtil.fullyDelete(new File(TEST_ROOT_DIR)); } private File localizeJob(JobID jobid) throws IOException { @@ -77,14 +83,26 @@ public class TestUserLogCleanup { userLogManager.addLogEvent(jce); } - private void startTT(Configuration conf) throws IOException { + private void startTT(JobConf conf) throws IOException, InterruptedException { myClock = new FakeClock(); // clock is reset. + String localdirs = TEST_ROOT_DIR + "/userlogs/local/0," + + TEST_ROOT_DIR + "/userlogs/local/1"; + conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localdirs); tt = new TaskTracker(); tt.setConf(new JobConf(conf)); + LocalDirAllocator localDirAllocator = + new LocalDirAllocator("mapred.local.dir"); + tt.setLocalDirAllocator(localDirAllocator); + LocalStorage localStorage = new LocalStorage(conf.getLocalDirs()); + localStorage.checkLocalDirs(); + tt.setLocalStorage(localStorage); localizer = new Localizer(FileSystem.get(conf), conf .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); tt.setLocalizer(localizer); userLogManager = new UtilsForTests.InLineUserLogManager(conf); + TaskController taskController = userLogManager.getTaskController(); + taskController.setup(localDirAllocator, localStorage); + tt.setTaskController(taskController); userLogCleaner = userLogManager.getUserLogCleaner(); userLogCleaner.setClock(myClock); tt.setUserLogManager(userLogManager); @@ -92,13 +110,13 @@ public class TestUserLogCleanup { } private void ttReinited() throws IOException { - Configuration conf = new Configuration(); + JobConf conf=new JobConf(); conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3); userLogManager.clearOldUserLogs(conf); } - private void ttRestarted() throws IOException { - Configuration conf = new Configuration(); + private void ttRestarted() throws IOException, InterruptedException { + JobConf conf=new JobConf(); conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3); startTT(conf); } @@ -228,9 +246,11 @@ public class TestUserLogCleanup { * restart. * * @throws IOException + * @throws InterruptedException */ @Test - public void testUserLogCleanupAfterRestart() throws IOException { + public void testUserLogCleanupAfterRestart() + throws IOException, InterruptedException { File jobUserlog1 = localizeJob(jobid1); File jobUserlog2 = localizeJob(jobid2); File jobUserlog3 = localizeJob(jobid3);