Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 82539 invoked from network); 7 Jan 2010 08:02:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 7 Jan 2010 08:02:05 -0000 Received: (qmail 29247 invoked by uid 500); 7 Jan 2010 08:02:05 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 29193 invoked by uid 500); 7 Jan 2010 08:02:05 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 29183 invoked by uid 99); 7 Jan 2010 08:02:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jan 2010 08:02:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jan 2010 08:01:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B533C23888DD; Thu, 7 Jan 2010 08:01:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r896781 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/filec... Date: Thu, 07 Jan 2010 08:00:42 -0000 To: mapreduce-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100107080130.B533C23888DD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Thu Jan 7 07:59:44 2010 New Revision: 896781 URL: http://svn.apache.org/viewvc?rev=896781&view=rev Log: MAPREDUCE-1186. Modified code in distributed cache to set permissions only on required set of localized paths. Contributed by Amareshwari Sriramadasu. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/c++/task-controller/main.c hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan 7 07:59:44 2010 @@ -110,6 +110,10 @@ MAPREDUCE-1224. Calling "SELECT t.* from AS t" to get meta information is too expensive for big tables. (Spencer Ho via tomwhite) + MAPREDUCE-1186. Modified code in distributed cache to set permissions + only on required set of localized paths. + (Amareshwari Sriramadasu via yhemanth) + BUG FIXES MAPREDUCE-1258. Fix fair scheduler event log not logging job info. Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original) +++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Thu Jan 7 07:59:44 2010 @@ -51,6 +51,7 @@ const char * task_id = NULL; const char * tt_root = NULL; const char *log_dir = NULL; + const char * unique_string = NULL; int exit_code = 0; const char * task_pid = NULL; const char* const short_options = "l:"; @@ -121,8 +122,11 @@ job_id = argv[optind++]; exit_code = initialize_job(job_id, user_detail->pw_name); break; - case INITIALIZE_DISTRIBUTEDCACHE: - exit_code = initialize_distributed_cache(user_detail->pw_name); + case INITIALIZE_DISTRIBUTEDCACHE_FILE: + tt_root = argv[optind++]; + unique_string = argv[optind++]; + exit_code = initialize_distributed_cache_file(tt_root, unique_string, + user_detail->pw_name); break; case LAUNCH_TASK_JVM: tt_root = argv[optind++]; Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original) +++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Thu Jan 7 07:59:44 2010 @@ -173,9 +173,10 @@ /** * Get the distributed cache directory for a particular user */ -char *get_distributed_cache_directory(const char *tt_root, const char *user) { - return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2, - tt_root, user); +char *get_distributed_cache_directory(const char *tt_root, const char *user, + const char* unique_string) { + return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, + "dist_cache_unique_path", 3, tt_root, user, unique_string); } char *get_job_work_directory(const char *job_dir) { @@ -795,22 +796,25 @@ } /** - * Function to initialize the distributed cache files of a user. + * Function to initialize the distributed cache file for a user. * It does the following: - * * sudo chown user:mapred -R taskTracker/$user/distcache - * * sudo chmod 2570 -R taskTracker/$user/distcache - * This is done once per every JVM launch. Tasks reusing JVMs just create + * * sudo chown user:mapred -R taskTracker/$user/distcache/ + * * sudo chmod 2570 -R taskTracker/$user/distcache/ + * This is done once per localization. Tasks reusing JVMs just create * symbolic links themselves and so there isn't anything specific to do in * that case. - * Sometimes, it happens that a task uses the whole or part of a directory - * structure in taskTracker/$user/distcache. In this case, some paths are - * already set proper private permissions by this same function called during - * a previous JVM launch. In the current invocation, we only do the - * chown/chmod operation of files/directories that are newly created by the - * TaskTracker (i.e. those that still are not owned by user:mapred) */ -int initialize_distributed_cache(const char *user) { - +int initialize_distributed_cache_file(const char *tt_root, + const char *unique_string, const char *user) { + if (tt_root == NULL) { + fprintf(LOGFILE, "tt_root passed is null.\n"); + return INVALID_ARGUMENT_NUMBER; + } + if (unique_string == NULL) { + fprintf(LOGFILE, "unique_string passed is null.\n"); + return INVALID_ARGUMENT_NUMBER; + } + if (user == NULL) { fprintf(LOGFILE, "user passed is null.\n"); return INVALID_ARGUMENT_NUMBER; @@ -820,69 +824,41 @@ fprintf(LOGFILE, "Couldn't get the user details of %s", user); return INVALID_USER_NAME; } - - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - - char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); - if (local_dir == NULL) { - fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + //Check tt_root + if (check_tt_root(tt_root) < 0) { + fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root); cleanup(); return INVALID_TT_ROOT; } - char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); -#ifdef DEBUG - fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, - full_local_dir_str); -#endif + // set permission on the unique directory + char *localized_unique_dir = get_distributed_cache_directory(tt_root, user, + unique_string); + if (localized_unique_dir == NULL) { + fprintf(LOGFILE, "Couldn't get unique distcache directory for %s.\n", user); + cleanup(); + return INITIALIZE_DISTCACHEFILE_FAILED; + } - char *distcache_dir; - char **local_dir_ptr = local_dir; + gid_t binary_gid = getegid(); // the group permissions of the binary. int failed = 0; - while (*local_dir_ptr != NULL) { - distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user); - if (distcache_dir == NULL) { - fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user); - failed = 1; - break; - } - - struct stat filestat; - if (stat(distcache_dir, &filestat) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n", - distcache_dir); -#endif - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n", - distcache_dir); - failed = 1; - free(distcache_dir); - break; - } - } else if (secure_path(distcache_dir, user_detail->pw_uid, - tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR + struct stat filestat; + if (stat(localized_unique_dir, &filestat) != 0) { + // stat on distcache failed because of something + fprintf(LOGFILE, "Failed to stat the localized_unique_dir %s\n", + localized_unique_dir); + failed = INITIALIZE_DISTCACHEFILE_FAILED; + } else if (secure_path(localized_unique_dir, user_detail->pw_uid, + binary_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG, 1) != 0) { - // No setgid on files and setgid on dirs, 570 - fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n", - distcache_dir); - failed = 1; - free(distcache_dir); - break; - } - - local_dir_ptr++; - free(distcache_dir); + // No setgid on files and setgid on dirs, 570 + fprintf(LOGFILE, "Failed to secure the localized_unique_dir %s\n", + localized_unique_dir); + failed = INITIALIZE_DISTCACHEFILE_FAILED; } - free(local_dir); - free(full_local_dir_str); + free(localized_unique_dir); cleanup(); - if (failed) { - return INITIALIZE_DISTCACHE_FAILED; - } - return 0; + return failed; } /** Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original) +++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Thu Jan 7 07:59:44 2010 @@ -39,7 +39,7 @@ enum command { INITIALIZE_USER, INITIALIZE_JOB, - INITIALIZE_DISTRIBUTEDCACHE, + INITIALIZE_DISTRIBUTEDCACHE_FILE, LAUNCH_TASK_JVM, INITIALIZE_TASK, TERMINATE_TASK_JVM, @@ -68,7 +68,7 @@ PREPARE_TASK_LOGS_FAILED, //16 INVALID_TT_LOG_DIR, //17 OUT_OF_MEMORY, //18 - INITIALIZE_DISTCACHE_FAILED, //19 + INITIALIZE_DISTCACHEFILE_FAILED, //19 INITIALIZE_USER_FAILED, //20 UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21 INVALID_CONF_DIR, //22 @@ -79,7 +79,7 @@ #define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s" -#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache" +#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache/%s" #define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work" @@ -116,7 +116,8 @@ int initialize_job(const char *jobid, const char *user); -int initialize_distributed_cache(const char *user); +int initialize_distributed_cache_file(const char *tt_root, + const char* unique_string, const char *user); int kill_user_task(const char *user, const char *task_pid, int sig); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Thu Jan 7 07:59:44 2010 @@ -31,6 +31,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; /** * The default implementation for controlling tasks. @@ -156,8 +158,17 @@ } @Override - public void initializeDistributedCache(InitializationContext context) { - // Do nothing. + public void initializeDistributedCacheFile(DistributedCacheFileContext context) + throws IOException { + Path localizedUniqueDir = context.getLocalizedUniqueDir(); + try { + // Setting recursive execute permission on localized dir + LOG.info("Doing chmod on localdir :" + localizedUniqueDir); + FileUtil.chmod(localizedUniqueDir.toString(), "+x", true); + } catch (InterruptedException ie) { + LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie); + throw new IOException(ie); + } } @Override Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Thu Jan 7 07:59:44 2010 @@ -85,7 +85,7 @@ enum TaskCommands { INITIALIZE_USER, INITIALIZE_JOB, - INITIALIZE_DISTRIBUTEDCACHE, + INITIALIZE_DISTRIBUTEDCACHE_FILE, LAUNCH_TASK_JVM, INITIALIZE_TASK, TERMINATE_TASK_JVM, @@ -475,12 +475,21 @@ } @Override - public void initializeDistributedCache(InitializationContext context) + public void initializeDistributedCacheFile(DistributedCacheFileContext context) throws IOException { - LOG.debug("Going to initialize distributed cache for " + context.user - + " on the TT"); - runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user, - new ArrayList(), context.workDir, null); + if (LOG.isDebugEnabled()) { + LOG.debug("Going to initialize distributed cache for " + context.user + + " with localizedBaseDir " + context.localizedBaseDir + + " and uniqueString " + context.uniqueString); + } + List args = new ArrayList(); + // Here, uniqueString might start with '-'. Adding -- in front of the + // arguments indicates that they are non-option parameters. + args.add("--"); + args.add(context.localizedBaseDir.toString()); + args.add(context.uniqueString); + runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user, + args, context.workDir, null); } @Override Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Jan 7 07:59:44 2010 @@ -115,7 +115,7 @@ // Manage the distributed cache. If there are files to be copied, // this will trigger localFile to be re-written again. this.trackerDistributerdCacheManager = - new TrackerDistributedCacheManager(conf); + new TrackerDistributedCacheManager(conf, new DefaultTaskController()); this.taskDistributedCacheManager = trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf); taskDistributedCacheManager.setup( Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Thu Jan 7 07:59:44 2010 @@ -72,12 +72,10 @@ * disks: *
    *
  • mapreduce.cluster.local.directories
  • - *
  • Job cache directories
  • - *
  • Archive directories
  • *
  • Hadoop log directories
  • *
*/ - void setup() { + public void setup() { for (String localDir : this.mapredLocalDirs) { // Set up the mapreduce.cluster.local.directories. File mapredlocalDir = new File(localDir); @@ -111,13 +109,13 @@ /** * Take task-controller specific actions to initialize the distributed cache - * files. This involves setting appropriate permissions for these files so as + * file. This involves setting appropriate permissions for these files so as * to secure them to be accessible only their owners. * * @param context * @throws IOException */ - public abstract void initializeDistributedCache(InitializationContext context) + public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context) throws IOException; /** @@ -261,6 +259,37 @@ public static class InitializationContext { public File workDir; public String user; + + public InitializationContext() { + } + + public InitializationContext(String user, File workDir) { + this.user = user; + this.workDir = workDir; + } + } + + /** + * This is used for initializing the private localized files in distributed + * cache. Initialization would involve changing permission, ownership and etc. + */ + public static class DistributedCacheFileContext extends InitializationContext { + // base directory under which file has been localized + Path localizedBaseDir; + // the unique string used to construct the localized path + String uniqueString; + + public DistributedCacheFileContext(String user, File workDir, + Path localizedBaseDir, String uniqueString) { + super(user, workDir); + this.localizedBaseDir = localizedBaseDir; + this.uniqueString = uniqueString; + } + + public Path getLocalizedUniqueDir() { + return new Path(localizedBaseDir, new Path(TaskTracker + .getPrivateDistributedCacheDir(user), uniqueString)); + } } static class JobInitializationContext extends InitializationContext { Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Jan 7 07:59:44 2010 @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskController.InitializationContext; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; @@ -182,11 +181,6 @@ // the conf object after this will NOT be reflected to the child. setupChildTaskConfiguration(lDirAlloc); - InitializationContext context = new InitializationContext(); - context.user = conf.getUser(); - context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR)); - tracker.getTaskController().initializeDistributedCache(context); - if (!prepare()) { return; } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jan 7 07:59:44 2010 @@ -635,12 +635,17 @@ this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress; LOG.info("Starting tracker " + taskTrackerName); - // Initialize DistributedCache and - // clear out temporary files that might be lying around - this.distributedCacheManager = - new TrackerDistributedCacheManager(this.fConf); - this.distributedCacheManager.purgeCache(); - cleanupStorage(); + Class taskControllerClass = fConf.getClass( + TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class); + taskController = (TaskController) ReflectionUtils.newInstance( + taskControllerClass, fConf); + + // setup and create jobcache directory with appropriate permissions + taskController.setup(); + + // Initialize DistributedCache + this.distributedCacheManager = new TrackerDistributedCacheManager( + this.fConf, taskController); this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(InterTrackerProtocol.class, @@ -664,15 +669,6 @@ reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots); mapLauncher.start(); reduceLauncher.start(); - Class taskControllerClass - = fConf.getClass(TT_TASK_CONTROLLER, - DefaultTaskController.class, - TaskController.class); - taskController = (TaskController)ReflectionUtils.newInstance( - taskControllerClass, fConf); - - //setup and create jobcache directory with appropriate permissions - taskController.setup(); // create a localizer instance setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController)); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Thu Jan 7 07:59:44 2010 @@ -24,6 +24,7 @@ import org.apache.hadoop.util.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.DefaultTaskController; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -198,9 +199,9 @@ boolean isArchive, long confFileStamp, Path currentWorkDir, boolean honorSymLinkConf) throws IOException { - return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf, - baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir, - honorSymLinkConf); + return new TrackerDistributedCacheManager(conf, new DefaultTaskController()) + .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive, + confFileStamp, currentWorkDir, honorSymLinkConf, false); } /** @@ -277,8 +278,8 @@ if (timestamp == null) { throw new IOException("TimeStamp of the uri couldnot be found"); } - new TrackerDistributedCacheManager(conf).releaseCache(cache, conf, - Long.parseLong(timestamp)); + new TrackerDistributedCacheManager(conf, new DefaultTaskController()) + .releaseCache(cache, conf, Long.parseLong(timestamp)); } /** @@ -294,7 +295,8 @@ @Deprecated public static String makeRelative(URI cache, Configuration conf) throws IOException { - return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf); + return new TrackerDistributedCacheManager(conf, new DefaultTaskController()) + .makeRelative(cache, conf); } /** @@ -657,6 +659,7 @@ */ @Deprecated public static void purgeCache(Configuration conf) throws IOException { - new TrackerDistributedCacheManager(conf).purgeCache(); + new TrackerDistributedCacheManager(conf, new DefaultTaskController()) + .purgeCache(); } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Thu Jan 7 07:59:44 2010 @@ -171,7 +171,7 @@ Path p = distributedCacheManager.getLocalCache(uri, taskConf, cacheSubdir, fileStatus, cacheFile.type == CacheFile.FileType.ARCHIVE, - cacheFile.timestamp, workdirPath, false); + cacheFile.timestamp, workdirPath, false, cacheFile.isPublic); cacheFile.setLocalized(true); if (cacheFile.type == CacheFile.FileType.ARCHIVE) { Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jan 7 07:59:44 2010 @@ -30,6 +30,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskController; +import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.fs.FileStatus; @@ -68,14 +71,18 @@ private LocalDirAllocator lDirAllocator; + private TaskController taskController; + private Configuration trackerConf; private Random random = new Random(); - public TrackerDistributedCacheManager(Configuration conf) throws IOException { + public TrackerDistributedCacheManager(Configuration conf, + TaskController taskController) throws IOException { this.localFs = FileSystem.getLocal(conf); this.trackerConf = conf; this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR); + this.taskController = taskController; } /** @@ -103,6 +110,7 @@ * launches * NOTE: This is effectively always on since r696957, since there is no code * path that does not use this. + * @param isPublic to know the cache file is accessible to public or private * @return the path to directory where the archives are unjarred in case of * archives, the path to the file where the file is copied locally * @throws IOException @@ -110,7 +118,7 @@ Path getLocalCache(URI cache, Configuration conf, String subDir, FileStatus fileStatus, boolean isArchive, long confFileStamp, - Path currentWorkDir, boolean honorSymLinkConf) + Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic) throws IOException { String key = getKey(cache, conf, confFileStamp); CacheStatus lcacheStatus; @@ -119,13 +127,13 @@ lcacheStatus = cachedArchives.get(key); if (lcacheStatus == null) { // was never localized + String uniqueString = String.valueOf(random.nextLong()); String cachePath = new Path (subDir, - new Path(String.valueOf(random.nextLong()), - makeRelative(cache, conf))).toString(); + new Path(uniqueString, makeRelative(cache, conf))).toString(); Path localPath = lDirAllocator.getLocalPathForWrite(cachePath, fileStatus.getLen(), trackerConf); - lcacheStatus = new CacheStatus( - new Path(localPath.toString().replace(cachePath, "")), localPath); + lcacheStatus = new CacheStatus(new Path(localPath.toString().replace( + cachePath, "")), localPath, new Path(subDir), uniqueString); cachedArchives.put(key, lcacheStatus); } @@ -139,7 +147,7 @@ synchronized (lcacheStatus) { if (!lcacheStatus.isInited()) { localizedPath = localizeCache(conf, cache, confFileStamp, - lcacheStatus, fileStatus, isArchive); + lcacheStatus, fileStatus, isArchive, isPublic); lcacheStatus.initComplete(); } else { localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp, @@ -244,17 +252,17 @@ // do the deletion, after releasing the global lock for (CacheStatus lcacheStatus : deleteSet) { synchronized (lcacheStatus) { - FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true); - LOG.info("Deleted path " + lcacheStatus.localLoadPath); + FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true); + LOG.info("Deleted path " + lcacheStatus.localizedLoadPath); // decrement the size of the cache from baseDirSize synchronized (baseDirSize) { - Long dirSize = baseDirSize.get(lcacheStatus.baseDir); + Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir); if ( dirSize != null ) { dirSize -= lcacheStatus.size; - baseDirSize.put(lcacheStatus.baseDir, dirSize); + baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize); } else { LOG.warn("Cannot find record of the baseDir: " + - lcacheStatus.baseDir + " during delete!"); + lcacheStatus.localizedBaseDir + " during delete!"); } } } @@ -363,13 +371,13 @@ // Has to be if (!ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus, fileStatus)) { - throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + + throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath + " for cache-file: " + cache); } LOG.info(String.format("Using existing cache of %s->%s", - cache.toString(), cacheStatus.localLoadPath)); - return cacheStatus.localLoadPath; + cache.toString(), cacheStatus.localizedLoadPath)); + return cacheStatus.localizedLoadPath; } private void createSymlink(Configuration conf, URI cache, @@ -384,7 +392,7 @@ File flink = new File(link); if (doSymlink){ if (!flink.exists()) { - FileUtil.symLink(cacheStatus.localLoadPath.toString(), link); + FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link); } } } @@ -395,21 +403,21 @@ URI cache, long confFileStamp, CacheStatus cacheStatus, FileStatus fileStatus, - boolean isArchive) + boolean isArchive, boolean isPublic) throws IOException { FileSystem fs = FileSystem.get(cache, conf); FileSystem localFs = FileSystem.getLocal(conf); Path parchive = null; if (isArchive) { - parchive = new Path(cacheStatus.localLoadPath, - new Path(cacheStatus.localLoadPath.getName())); + parchive = new Path(cacheStatus.localizedLoadPath, + new Path(cacheStatus.localizedLoadPath.getName())); } else { - parchive = cacheStatus.localLoadPath; + parchive = cacheStatus.localizedLoadPath; } if (!localFs.mkdirs(parchive.getParent())) { throw new IOException("Mkdirs failed to create directory " + - cacheStatus.localLoadPath.toString()); + cacheStatus.localizedLoadPath.toString()); } String cacheId = cache.getPath(); @@ -439,29 +447,45 @@ FileUtil.getDU(new File(parchive.getParent().toString())); cacheStatus.size = cacheSize; synchronized (baseDirSize) { - Long dirSize = baseDirSize.get(cacheStatus.baseDir); + Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir); if( dirSize == null ) { dirSize = Long.valueOf(cacheSize); } else { dirSize += cacheSize; } - baseDirSize.put(cacheStatus.baseDir, dirSize); + baseDirSize.put(cacheStatus.localizedBaseDir, dirSize); } - // do chmod here - try { - //Setting recursive permission to grant everyone read and execute - FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true); - } catch(InterruptedException e) { - LOG.warn("Exception in chmod" + e.toString()); - } + // set proper permissions for the localized directory + setPermissions(conf, cacheStatus, isPublic); // update cacheStatus to reflect the newly cached file cacheStatus.mtime = getTimestamp(conf, cache); LOG.info(String.format("Cached %s as %s", - cache.toString(), cacheStatus.localLoadPath)); - return cacheStatus.localLoadPath; + cache.toString(), cacheStatus.localizedLoadPath)); + return cacheStatus.localizedLoadPath; + } + + private void setPermissions(Configuration conf, CacheStatus cacheStatus, + boolean isPublic) throws IOException { + if (isPublic) { + Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir(); + LOG.info("Doing chmod on localdir :" + localizedUniqueDir); + try { + FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true); + } catch (InterruptedException e) { + LOG.warn("Exception in chmod" + e.toString()); + throw new IOException(e); + } + } else { + // invoke taskcontroller to set permissions + DistributedCacheFileContext context = new DistributedCacheFileContext( + conf.get(JobContext.USER_NAME), new File(cacheStatus.localizedBaseDir + .toString()), cacheStatus.localizedBaseDir, + cacheStatus.uniqueString); + taskController.initializeDistributedCacheFile(context); + } } private static boolean isTarFile(String filename) { @@ -532,10 +556,10 @@ static class CacheStatus { // the local load path of this cache - Path localLoadPath; + Path localizedLoadPath; //the base dir where the cache lies - Path baseDir; + Path localizedBaseDir; //the size of this cache long size; @@ -548,18 +572,28 @@ // is it initialized ? boolean inited = false; + + // The sub directory (tasktracker/archive or tasktracker/user/archive), + // under which the file will be localized + Path subDir; - public CacheStatus(Path baseDir, Path localLoadPath) { + // unique string used in the construction of local load path + String uniqueString; + + public CacheStatus(Path baseDir, Path localLoadPath, Path subDir, + String uniqueString) { super(); - this.localLoadPath = localLoadPath; + this.localizedLoadPath = localLoadPath; this.refcount = 0; this.mtime = -1; - this.baseDir = baseDir; + this.localizedBaseDir = baseDir; this.size = 0; + this.subDir = subDir; + this.uniqueString = uniqueString; } Path getBaseDir(){ - return this.baseDir; + return this.localizedBaseDir; } // mark it as initialized @@ -571,6 +605,10 @@ boolean isInited() { return inited; } + + Path getLocalizedUniqueDir() { + return new Path(localizedBaseDir, new Path(subDir, uniqueString)); + } } /** @@ -582,7 +620,7 @@ synchronized (cachedArchives) { for (Map.Entry f: cachedArchives.entrySet()) { try { - localFs.delete(f.getValue().localLoadPath, true); + localFs.delete(f.getValue().localizedLoadPath, true); } catch (IOException ie) { LOG.debug("Error cleaning up cache", ie); } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Thu Jan 7 07:59:44 2010 @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager; @@ -36,7 +37,6 @@ TestTrackerDistributedCacheManager { private File configFile; - private MyLinuxTaskController taskController; private String taskTrackerSpecialGroup; private static final Log LOG = @@ -65,7 +65,7 @@ ClusterWithLinuxTaskController.createTaskControllerConf(path, conf .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); String execPath = path + "/task-controller"; - taskController.setTaskControllerExe(execPath); + ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath); taskController.setConf(conf); taskController.setup(); @@ -74,6 +74,17 @@ } @Override + protected void refreshConf(Configuration conf) throws IOException { + super.refreshConf(conf); + String path = + System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); + configFile = + ClusterWithLinuxTaskController.createTaskControllerConf(path, conf + .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); + + } + + @Override protected void tearDown() throws IOException { if (!ClusterWithLinuxTaskController.shouldRun()) { @@ -99,27 +110,19 @@ } @Override - protected TaskController getTaskController() { - return taskController; - } - - @Override protected void checkFilePermissions(Path[] localCacheFiles) throws IOException { - String cachedFirstFile = localCacheFiles[0].toUri().getPath(); - String cachedSecondFile = localCacheFiles[1].toUri().getPath(); String userName = getJobOwnerName(); - // First make sure that the cache files have proper permissions. - TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile, - "-r-xrwx---", userName, taskTrackerSpecialGroup); - TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile, - "-r-xrwx---", userName, taskTrackerSpecialGroup); - - // Now. make sure that all the path components also have proper - // permissions. - checkPermissionOnPathComponents(cachedFirstFile, userName); - checkPermissionOnPathComponents(cachedSecondFile, userName); + for (Path p : localCacheFiles) { + // First make sure that the cache file has proper permissions. + TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(), + "-r-xrwx---", userName, taskTrackerSpecialGroup); + // Now. make sure that all the path components also have proper + // permissions. + checkPermissionOnPathComponents(p.toUri().getPath(), userName); + } + } /** @@ -136,7 +139,7 @@ + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]" + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName), ""); - LOG.info("Leading path for cacheFirstFile is : " + LOG.info("Trailing path for cacheFirstFile is : " + trailingStringForFirstFile); // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string. String leadingStringForFirstFile = Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=896781&r1=896780&r2=896781&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jan 7 07:59:44 2010 @@ -32,12 +32,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.DefaultTaskController; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskController; import org.apache.hadoop.mapred.TaskTracker; -import org.apache.hadoop.mapred.TaskController.InitializationContext; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,10 +45,13 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.mortbay.log.Log; public class TestTrackerDistributedCacheManager extends TestCase { @@ -59,7 +62,6 @@ .getAbsolutePath(); protected File ROOT_MAPRED_LOCAL_DIR; - private static String TEST_CACHE_BASE_DIR = "cachebasedir"; protected int numLocalDirs = 6; private static final int TEST_FILE_SIZE = 4 * 1024; // 4K @@ -70,7 +72,8 @@ private FileSystem fs; protected LocalDirAllocator localDirAllocator = - new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY); + new LocalDirAllocator(MRConfig.LOCAL_DIR); + protected TaskController taskController; @Override protected void setUp() throws IOException,InterruptedException { @@ -85,11 +88,25 @@ ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); ROOT_MAPRED_LOCAL_DIR.mkdirs(); + String []localDirs = new String[numLocalDirs]; + for (int i = 0; i < numLocalDirs; i++) { + File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i); + localDirs[i] = localDir.getPath(); + localDir.mkdir(); + } + conf = new Configuration(); - conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT); - conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString()); + conf.setStrings(MRConfig.LOCAL_DIR, localDirs); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); fs = FileSystem.get(conf); + Class taskControllerClass = conf.getClass( + TTConfig.TT_TASK_CONTROLLER, DefaultTaskController.class, + TaskController.class); + taskController = (TaskController) ReflectionUtils.newInstance( + taskControllerClass, conf); + + // setup permissions for mapred local dir + taskController.setup(); // Create the temporary cache files to be used in the tests. firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile"); @@ -97,6 +114,11 @@ createPrivateTempFile(firstCacheFile); createPrivateTempFile(secondCacheFile); } + + protected void refreshConf(Configuration conf) throws IOException { + taskController.setConf(conf); + taskController.setup(); + } /** * Whether the test can run on the machine @@ -121,6 +143,8 @@ // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(conf); + String userName = getJobOwnerName(); + subConf.set(JobContext.USER_NAME, userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf); TrackerDistributedCacheManager.determineTimestamps(subConf); @@ -132,11 +156,9 @@ subConf.writeXml(os); os.close(); - String userName = getJobOwnerName(); - // ****** Imitate TaskRunner code. TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); + new TrackerDistributedCacheManager(conf, taskController); TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); @@ -144,11 +166,6 @@ handle.setup(localDirAllocator, workDir, TaskTracker .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir()); - - InitializationContext context = new InitializationContext(); - context.user = userName; - context.workDir = workDir; - getTaskController().initializeDistributedCache(context); // ****** End of imitating TaskRunner code Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); @@ -178,18 +195,18 @@ TrackerDistributedCacheManager { public FakeTrackerDistributedCacheManager(Configuration conf) throws IOException { - super(conf); + super(conf, taskController); } @Override Path localizeCache(Configuration conf, URI cache, long confFileStamp, - CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive) - throws IOException { + CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive, + boolean isPublic) throws IOException { if (cache.equals(firstCacheFile.toUri())) { throw new IOException("fake fail"); } return super.localizeCache(conf, cache, confFileStamp, cacheStatus, - fileStatus, isArchive); + fileStatus, isArchive, isPublic); } } @@ -198,8 +215,6 @@ if (!canRun()) { return; } - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); TrackerDistributedCacheManager manager = new FakeTrackerDistributedCacheManager(conf); Cluster cluster = new Cluster(conf); @@ -208,6 +223,7 @@ // Configures a job with a regular file Job job1 = Job.getInstance(cluster, conf); + job1.setUser(userName); job1.addCacheFile(secondCacheFile.toUri()); Configuration conf1 = job1.getConfiguration(); TrackerDistributedCacheManager.determineTimestamps(conf1); @@ -229,6 +245,7 @@ // Configures another job with three regular files. Job job2 = Job.getInstance(cluster, conf); + job2.setUser(userName); // add a file that would get failed to localize job2.addCacheFile(firstCacheFile.toUri()); // add a file that is already localized by different job @@ -286,7 +303,7 @@ private void checkLocalizedPath(String visibility) throws IOException, LoginException, InterruptedException { TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); + new TrackerDistributedCacheManager(conf, taskController); Cluster cluster = new Cluster(conf); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); @@ -298,6 +315,7 @@ } Job job1 = Job.getInstance(cluster, conf); + job1.setUser(userName); job1.addCacheFile(cacheFile.toUri()); Configuration conf1 = job1.getConfiguration(); TrackerDistributedCacheManager.determineTimestamps(conf1); @@ -319,12 +337,18 @@ Path localizedPath = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), false, - c.timestamp, new Path(TEST_ROOT_DIR), false); + c.timestamp, new Path(TEST_ROOT_DIR), false, + Boolean.parseBoolean(visibility)); assertTrue("Cache file didn't get localized in the expected directory. " + "Expected localization to happen within " + ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir + ", but was localized at " + localizedPath, localizedPath.toString().contains(distCacheDir)); + if ("true".equals(visibility)) { + checkPublicFilePermissions(new Path[]{localizedPath}); + } else { + checkFilePermissions(new Path[]{localizedPath}); + } } /** @@ -335,17 +359,29 @@ */ protected void checkFilePermissions(Path[] localCacheFiles) throws IOException { - Path cachedFirstFile = localCacheFiles[0]; - Path cachedSecondFile = localCacheFiles[1]; - // Both the files should have executable permissions on them. - assertTrue("First cache file is not executable!", new File(cachedFirstFile - .toUri().getPath()).canExecute()); - assertTrue("Second cache file is not executable!", new File( - cachedSecondFile.toUri().getPath()).canExecute()); + // All the files should have executable permissions on them. + for (Path p : localCacheFiles) { + assertTrue("Cache file is not executable!", new File(p + .toUri().getPath()).canExecute()); + } } - protected TaskController getTaskController() { - return new DefaultTaskController(); + /** + * Check permissions on the public cache files + * + * @param localCacheFiles + * @throws IOException + */ + private void checkPublicFilePermissions(Path[] localCacheFiles) + throws IOException { + // All the files should have read and executable permissions for others + for (Path p : localCacheFiles) { + FsPermission perm = fs.getFileStatus(p).getPermission(); + assertTrue("cache file is not readable by others", perm.getOtherAction() + .implies(FsAction.READ)); + assertTrue("cache file is not executable by others", perm + .getOtherAction().implies(FsAction.EXECUTE)); + } } protected String getJobOwnerName() throws LoginException { @@ -358,27 +394,39 @@ if (!canRun()) { return; } + // This test needs MRConfig.LOCAL_DIR to be single directory + // instead of four, because it assumes that both + // firstcachefile and secondcachefile will be localized on same directory + // so that second localization triggers deleteCache. + // If MRConfig.LOCAL_DIR is four directories, second localization might not + // trigger deleteCache, if it is localized in different directory. + Configuration conf2 = new Configuration(conf); + conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString()); + conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT); + refreshConf(conf2); TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); - FileSystem localfs = FileSystem.getLocal(conf); + new TrackerDistributedCacheManager(conf2, taskController); + FileSystem localfs = FileSystem.getLocal(conf2); long now = System.currentTimeMillis(); + String userName = getJobOwnerName(); + conf2.set(JobContext.USER_NAME, userName); - manager.getLocalCache(firstCacheFile.toUri(), conf, - TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false, - now, new Path(TEST_ROOT_DIR), false); - manager.releaseCache(firstCacheFile.toUri(), conf, now); + Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(firstCacheFile), false, + now, new Path(TEST_ROOT_DIR), false, false); + manager.releaseCache(firstCacheFile.toUri(), conf2, now); //in above code,localized a file of size 4K and then release the cache // which will cause the cache be deleted when the limit goes out. // The below code localize another cache which's designed to //sweep away the first cache. - manager.getLocalCache(secondCacheFile.toUri(), conf, - TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, - System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false); - FileStatus[] dirStatuses = localfs.listStatus( - new Path(ROOT_MAPRED_LOCAL_DIR.toString())); - assertTrue("DistributedCache failed deleting old" + + manager.getLocalCache(secondCacheFile.toUri(), conf2, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(secondCacheFile), false, + System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + assertFalse("DistributedCache failed deleting old" + " cache when the cache store is full.", - dirStatuses.length == 1); + localfs.exists(localCache)); } public void testFileSystemOtherThanDefault() throws Exception { @@ -386,14 +434,17 @@ return; } TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); + new TrackerDistributedCacheManager(conf, taskController); conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); + String userName = getJobOwnerName(); + conf.set(JobContext.USER_NAME, userName); Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); Path result = manager.getLocalCache(fileToCache.toUri(), conf, - TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(firstCacheFile), false, System.currentTimeMillis(), - new Path(TEST_ROOT_DIR), false); + new Path(TEST_ROOT_DIR), false, false); assertNotNull("DistributedCache cached file on non-default filesystem.", result); } @@ -459,20 +510,21 @@ return; } Configuration myConf = new Configuration(conf); - myConf.set("fs.default.name", "refresh:///"); + myConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "refresh:///"); myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class); + String userName = getJobOwnerName(); + TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(myConf); + new TrackerDistributedCacheManager(myConf, taskController); // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(myConf); + subConf.set(JobContext.USER_NAME, userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); TrackerDistributedCacheManager.determineTimestamps(subConf); TrackerDistributedCacheManager.determineCacheVisibilities(subConf); // ****** End of imitating JobClient code - String userName = getJobOwnerName(); - // ****** Imitate TaskRunner code. TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(subConf); @@ -513,6 +565,7 @@ // submit another job Configuration subConf2 = new Configuration(myConf); + subConf2.set(JobContext.USER_NAME, userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2); TrackerDistributedCacheManager.determineTimestamps(subConf2); TrackerDistributedCacheManager.determineCacheVisibilities(subConf2); @@ -536,4 +589,46 @@ handle.release(); } + /** + * Localize a file. After localization is complete, create a file, "myFile", + * under the directory where the file is localized and ensure that it has + * permissions different from what is set by default. Then, localize another + * file. Verify that "myFile" has the right permissions. + * @throws Exception + */ + public void testCustomPermissions() throws Exception { + if (!canRun()) { + return; + } + String userName = getJobOwnerName(); + conf.set(JobContext.USER_NAME, userName); + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf, taskController); + FileSystem localfs = FileSystem.getLocal(conf); + long now = System.currentTimeMillis(); + + Path[] localCache = new Path[2]; + localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(firstCacheFile), false, + now, new Path(TEST_ROOT_DIR), false, false); + FsPermission myPermission = new FsPermission((short)0600); + Path myFile = new Path(localCache[0].getParent(), "myfile.txt"); + if (FileSystem.create(localfs, myFile, myPermission) == null) { + throw new IOException("Could not create " + myFile); + } + try { + localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(secondCacheFile), false, + System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + FileStatus stat = localfs.getFileStatus(myFile); + assertTrue(stat.getPermission().equals(myPermission)); + // validate permissions of localized files. + checkFilePermissions(localCache); + } finally { + localfs.delete(myFile, false); + } + } + }