hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r896781 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/filec...
Date Thu, 07 Jan 2010 08:00:42 GMT
Author: yhemanth
Date: Thu Jan  7 07:59:44 2010
New Revision: 896781

URL: http://svn.apache.org/viewvc?rev=896781&view=rev
Log:
MAPREDUCE-1186. Modified code in distributed cache to set permissions only on required set of localized paths. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/main.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan  7 07:59:44 2010
@@ -110,6 +110,10 @@
     MAPREDUCE-1224. Calling "SELECT t.* from <table> AS t" to get meta
     information is too expensive for big tables. (Spencer Ho via tomwhite)
 
+    MAPREDUCE-1186. Modified code in distributed cache to set permissions
+    only on required set of localized paths.
+    (Amareshwari Sriramadasu via yhemanth)
+
   BUG FIXES
 
     MAPREDUCE-1258. Fix fair scheduler event log not logging job info.

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Thu Jan  7 07:59:44 2010
@@ -51,6 +51,7 @@
   const char * task_id = NULL;
   const char * tt_root = NULL;
   const char *log_dir = NULL;
+  const char * unique_string = NULL;
   int exit_code = 0;
   const char * task_pid = NULL;
   const char* const short_options = "l:";
@@ -121,8 +122,11 @@
     job_id = argv[optind++];
     exit_code = initialize_job(job_id, user_detail->pw_name);
     break;
-  case INITIALIZE_DISTRIBUTEDCACHE:
-    exit_code = initialize_distributed_cache(user_detail->pw_name);
+  case INITIALIZE_DISTRIBUTEDCACHE_FILE:
+    tt_root = argv[optind++];
+    unique_string = argv[optind++];
+    exit_code = initialize_distributed_cache_file(tt_root, unique_string,
+        user_detail->pw_name);
     break;
   case LAUNCH_TASK_JVM:
     tt_root = argv[optind++];

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Thu Jan  7 07:59:44 2010
@@ -173,9 +173,10 @@
 /**
  * Get the distributed cache directory for a particular user
  */
-char *get_distributed_cache_directory(const char *tt_root, const char *user) {
-  return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2,
-      tt_root, user);
+char *get_distributed_cache_directory(const char *tt_root, const char *user,
+    const char* unique_string) {
+  return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, 
+      "dist_cache_unique_path", 3, tt_root, user, unique_string);
 }
 
 char *get_job_work_directory(const char *job_dir) {
@@ -795,22 +796,25 @@
 }
 
 /**
- * Function to initialize the distributed cache files of a user.
+ * Function to initialize the distributed cache file for a user.
  * It does the following:
- *     *  sudo chown user:mapred -R taskTracker/$user/distcache
- *     *  sudo chmod 2570 -R taskTracker/$user/distcache
- * This is done once per every JVM launch. Tasks reusing JVMs just create
+ *     *  sudo chown user:mapred -R taskTracker/$user/distcache/<randomdir>
+ *     *  sudo chmod 2570 -R taskTracker/$user/distcache/<randomdir>
+ * This is done once per localization. Tasks reusing JVMs just create
  * symbolic links themselves and so there isn't anything specific to do in
  * that case.
- * Sometimes, it happens that a task uses the whole or part of a directory
- * structure in taskTracker/$user/distcache. In this case, some paths are
- * already set proper private permissions by this same function called during
- * a previous JVM launch. In the current invocation, we only do the
- * chown/chmod operation of files/directories that are newly created by the
- * TaskTracker (i.e. those that still are not owned by user:mapred)
  */
-int initialize_distributed_cache(const char *user) {
-
+int initialize_distributed_cache_file(const char *tt_root, 
+    const char *unique_string, const char *user) {
+  if (tt_root == NULL) {
+    fprintf(LOGFILE, "tt_root passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+  if (unique_string == NULL) {
+    fprintf(LOGFILE, "unique_string passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+ 
   if (user == NULL) {
     fprintf(LOGFILE, "user passed is null.\n");
     return INVALID_ARGUMENT_NUMBER;
@@ -820,69 +824,41 @@
     fprintf(LOGFILE, "Couldn't get the user details of %s", user);
     return INVALID_USER_NAME;
   }
-
-  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-
-  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
-  if (local_dir == NULL) {
-    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+  //Check tt_root
+  if (check_tt_root(tt_root) < 0) {
+    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
     cleanup();
     return INVALID_TT_ROOT;
   }
 
-  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
-#ifdef DEBUG
-  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
-      full_local_dir_str);
-#endif
+  // set permission on the unique directory
+  char *localized_unique_dir = get_distributed_cache_directory(tt_root, user,
+      unique_string);
+  if (localized_unique_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get unique distcache directory for %s.\n", user);
+    cleanup();
+    return INITIALIZE_DISTCACHEFILE_FAILED;
+  }
 
-  char *distcache_dir;
-  char **local_dir_ptr = local_dir;
+  gid_t binary_gid = getegid(); // the group permissions of the binary.
   int failed = 0;
-  while (*local_dir_ptr != NULL) {
-    distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user);
-    if (distcache_dir == NULL) {
-      fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user);
-      failed = 1;
-      break;
-    }
-
-    struct stat filestat;
-    if (stat(distcache_dir, &filestat) != 0) {
-      if (errno == ENOENT) {
-#ifdef DEBUG
-        fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n",
-            distcache_dir);
-#endif
-      } else {
-        // stat failed because of something else!
-        fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n",
-            distcache_dir);
-        failed = 1;
-        free(distcache_dir);
-        break;
-      }
-    } else if (secure_path(distcache_dir, user_detail->pw_uid,
-        tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
+  struct stat filestat;
+  if (stat(localized_unique_dir, &filestat) != 0) {
+    // stat on distcache failed because of something
+    fprintf(LOGFILE, "Failed to stat the localized_unique_dir %s\n",
+        localized_unique_dir);
+    failed = INITIALIZE_DISTCACHEFILE_FAILED;
+  } else if (secure_path(localized_unique_dir, user_detail->pw_uid,
+        binary_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
             | S_IXUSR | S_IRWXG, 1) != 0) {
-      // No setgid on files and setgid on dirs, 570
-      fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
-          distcache_dir);
-      failed = 1;
-      free(distcache_dir);
-      break;
-    }
-
-    local_dir_ptr++;
-    free(distcache_dir);
+    // No setgid on files and setgid on dirs, 570
+    fprintf(LOGFILE, "Failed to secure the localized_unique_dir %s\n",
+        localized_unique_dir);
+    failed = INITIALIZE_DISTCACHEFILE_FAILED;
   }
-  free(local_dir);
-  free(full_local_dir_str);
+  free(localized_unique_dir);
   cleanup();
-  if (failed) {
-    return INITIALIZE_DISTCACHE_FAILED;
-  }
-  return 0;
+  return failed;
 }
 
 /**

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Thu Jan  7 07:59:44 2010
@@ -39,7 +39,7 @@
 enum command {
   INITIALIZE_USER,
   INITIALIZE_JOB,
-  INITIALIZE_DISTRIBUTEDCACHE,
+  INITIALIZE_DISTRIBUTEDCACHE_FILE,
   LAUNCH_TASK_JVM,
   INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
@@ -68,7 +68,7 @@
   PREPARE_TASK_LOGS_FAILED, //16
   INVALID_TT_LOG_DIR, //17
   OUT_OF_MEMORY, //18
-  INITIALIZE_DISTCACHE_FAILED, //19
+  INITIALIZE_DISTCACHEFILE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
   UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
   INVALID_CONF_DIR, //22
@@ -79,7 +79,7 @@
 
 #define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
 
-#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache"
+#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache/%s"
 
 #define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
 
@@ -116,7 +116,8 @@
 
 int initialize_job(const char *jobid, const char *user);
 
-int initialize_distributed_cache(const char *user);
+int initialize_distributed_cache_file(const char *tt_root, 
+    const char* unique_string, const char *user);
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Thu Jan  7 07:59:44 2010
@@ -31,6 +31,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 
 /**
  * The default implementation for controlling tasks.
@@ -156,8 +158,17 @@
   }
 
   @Override
-  public void initializeDistributedCache(InitializationContext context) {
-    // Do nothing.
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+      throws IOException {
+    Path localizedUniqueDir = context.getLocalizedUniqueDir();
+    try {
+      // Setting recursive execute permission on localized dir
+      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+      FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
+    } catch (InterruptedException ie) {
+      LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
+      throw new IOException(ie);
+    }
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Thu Jan  7 07:59:44 2010
@@ -85,7 +85,7 @@
   enum TaskCommands {
     INITIALIZE_USER,
     INITIALIZE_JOB,
-    INITIALIZE_DISTRIBUTEDCACHE,
+    INITIALIZE_DISTRIBUTEDCACHE_FILE,
     LAUNCH_TASK_JVM,
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
@@ -475,12 +475,21 @@
   }
 
   @Override
-  public void initializeDistributedCache(InitializationContext context)
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
       throws IOException {
-    LOG.debug("Going to initialize distributed cache for " + context.user
-        + " on the TT");
-    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user,
-        new ArrayList<String>(), context.workDir, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to initialize distributed cache for " + context.user
+          + " with localizedBaseDir " + context.localizedBaseDir + 
+          " and uniqueString " + context.uniqueString);
+    }
+    List<String> args = new ArrayList<String>();
+    // Here, uniqueString might start with '-'. Adding -- in front of the 
+    // arguments indicates that they are non-option parameters.
+    args.add("--");
+    args.add(context.localizedBaseDir.toString());
+    args.add(context.uniqueString);
+    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
+        args, context.workDir, null);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Jan  7 07:59:44 2010
@@ -115,7 +115,7 @@
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
       this.trackerDistributerdCacheManager =
-          new TrackerDistributedCacheManager(conf);
+          new TrackerDistributedCacheManager(conf, new DefaultTaskController());
       this.taskDistributedCacheManager = 
           trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Thu Jan  7 07:59:44 2010
@@ -72,12 +72,10 @@
    * disks:
    * <ul>
    * <li>mapreduce.cluster.local.directories</li>
-   * <li>Job cache directories</li>
-   * <li>Archive directories</li>
    * <li>Hadoop log directories</li>
    * </ul>
    */
-  void setup() {
+  public void setup() {
     for (String localDir : this.mapredLocalDirs) {
       // Set up the mapreduce.cluster.local.directories.
       File mapredlocalDir = new File(localDir);
@@ -111,13 +109,13 @@
 
   /**
    * Take task-controller specific actions to initialize the distributed cache
-   * files. This involves setting appropriate permissions for these files so as
+   * file. This involves setting appropriate permissions for these files so as
    * to secure them to be accessible only their owners.
    * 
    * @param context
    * @throws IOException
    */
-  public abstract void initializeDistributedCache(InitializationContext context)
+  public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
       throws IOException;
 
   /**
@@ -261,6 +259,37 @@
   public static class InitializationContext {
     public File workDir;
     public String user;
+    
+    public InitializationContext() {
+    }
+    
+    public InitializationContext(String user, File workDir) {
+      this.user = user;
+      this.workDir = workDir;
+    }
+  }
+  
+  /**
+   * This is used for initializing the private localized files in distributed
+   * cache. Initialization would involve changing permission, ownership and etc.
+   */
+  public static class DistributedCacheFileContext extends InitializationContext {
+    // base directory under which file has been localized
+    Path localizedBaseDir;
+    // the unique string used to construct the localized path
+    String uniqueString;
+
+    public DistributedCacheFileContext(String user, File workDir,
+        Path localizedBaseDir, String uniqueString) {
+      super(user, workDir);
+      this.localizedBaseDir = localizedBaseDir;
+      this.uniqueString = uniqueString;
+    }
+
+    public Path getLocalizedUniqueDir() {
+      return new Path(localizedBaseDir, new Path(TaskTracker
+          .getPrivateDistributedCacheDir(user), uniqueString));
+    }
   }
 
   static class JobInitializationContext extends InitializationContext {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Jan  7 07:59:44 2010
@@ -43,7 +43,6 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
@@ -182,11 +181,6 @@
       // the conf object after this will NOT be reflected to the child.
       setupChildTaskConfiguration(lDirAlloc);
 
-      InitializationContext context = new InitializationContext();
-      context.user = conf.getUser();
-      context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
-      tracker.getTaskController().initializeDistributedCache(context);
-
       if (!prepare()) {
         return;
       }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jan  7 07:59:44 2010
@@ -635,12 +635,17 @@
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Initialize DistributedCache and
-    // clear out temporary files that might be lying around
-    this.distributedCacheManager = 
-        new TrackerDistributedCacheManager(this.fConf);
-    this.distributedCacheManager.purgeCache();
-    cleanupStorage();
+    Class<? extends TaskController> taskControllerClass = fConf.getClass(
+        TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class);
+    taskController = (TaskController) ReflectionUtils.newInstance(
+        taskControllerClass, fConf);
+
+    // setup and create jobcache directory with appropriate permissions
+    taskController.setup();
+
+    // Initialize DistributedCache
+    this.distributedCacheManager = new TrackerDistributedCacheManager(
+        this.fConf, taskController);
 
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
@@ -664,15 +669,6 @@
     reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
     mapLauncher.start();
     reduceLauncher.start();
-    Class<? extends TaskController> taskControllerClass 
-                          = fConf.getClass(TT_TASK_CONTROLLER,
-                                            DefaultTaskController.class, 
-                                            TaskController.class); 
-    taskController = (TaskController)ReflectionUtils.newInstance(
-                                                      taskControllerClass, fConf);
-    
-    //setup and create jobcache directory with appropriate permissions
-    taskController.setup();
 
     // create a localizer instance
     setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Thu Jan  7 07:59:44 2010
@@ -24,6 +24,7 @@
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.DefaultTaskController;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 
@@ -198,9 +199,9 @@
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
-        baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
-        honorSymLinkConf);
+    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
+            confFileStamp, currentWorkDir, honorSymLinkConf, false);
   }
 
   /**
@@ -277,8 +278,8 @@
     if (timestamp == null) {
       throw new IOException("TimeStamp of the uri couldnot be found");
     }
-    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
-          Long.parseLong(timestamp));
+    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .releaseCache(cache, conf, Long.parseLong(timestamp));
   }
   
   /**
@@ -294,7 +295,8 @@
   @Deprecated
   public static String makeRelative(URI cache, Configuration conf)
       throws IOException {
-    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
+    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .makeRelative(cache, conf);
   }
 
   /**
@@ -657,6 +659,7 @@
    */
   @Deprecated
   public static void purgeCache(Configuration conf) throws IOException {
-    new TrackerDistributedCacheManager(conf).purgeCache();
+    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .purgeCache();
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Thu Jan  7 07:59:44 2010
@@ -171,7 +171,7 @@
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
           cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
-          cacheFile.timestamp, workdirPath, false);
+          cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
       cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jan  7 07:59:44 2010
@@ -30,6 +30,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,14 +71,18 @@
   
   private LocalDirAllocator lDirAllocator;
   
+  private TaskController taskController;
+  
   private Configuration trackerConf;
   
   private Random random = new Random();
 
-  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+  public TrackerDistributedCacheManager(Configuration conf,
+      TaskController taskController) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
+    this.taskController = taskController;
   }
 
   /**
@@ -103,6 +110,7 @@
    * launches
    * NOTE: This is effectively always on since r696957, since there is no code
    * path that does not use this.
+   * @param isPublic to know the cache file is accessible to public or private
    * @return the path to directory where the archives are unjarred in case of
    * archives, the path to the file where the file is copied locally
    * @throws IOException
@@ -110,7 +118,7 @@
   Path getLocalCache(URI cache, Configuration conf,
       String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf)
+      Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
       throws IOException {
     String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
@@ -119,13 +127,13 @@
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
         // was never localized
+        String uniqueString = String.valueOf(random.nextLong());
         String cachePath = new Path (subDir, 
-          new Path(String.valueOf(random.nextLong()),
-            makeRelative(cache, conf))).toString();
+          new Path(uniqueString, makeRelative(cache, conf))).toString();
         Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
           fileStatus.getLen(), trackerConf);
-        lcacheStatus = new CacheStatus(
-          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
+          cachePath, "")), localPath, new Path(subDir), uniqueString);
         cachedArchives.put(key, lcacheStatus);
       }
 
@@ -139,7 +147,7 @@
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
           localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, fileStatus, isArchive);
+              lcacheStatus, fileStatus, isArchive, isPublic);
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -244,17 +252,17 @@
     // do the deletion, after releasing the global lock
     for (CacheStatus lcacheStatus : deleteSet) {
       synchronized (lcacheStatus) {
-        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
+        LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
         // decrement the size of the cache from baseDirSize
         synchronized (baseDirSize) {
-          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
           if ( dirSize != null ) {
             dirSize -= lcacheStatus.size;
-            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+            baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
           } else {
             LOG.warn("Cannot find record of the baseDir: " + 
-                     lcacheStatus.baseDir + " during delete!");
+                     lcacheStatus.localizedBaseDir + " during delete!");
           }
         }
       }
@@ -363,13 +371,13 @@
     // Has to be 
     if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
                           cacheStatus, fileStatus)) {
-      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + 
+      throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath + 
                             " for cache-file: " + cache);
     }
 
     LOG.info(String.format("Using existing cache of %s->%s",
-        cache.toString(), cacheStatus.localLoadPath));
-    return cacheStatus.localLoadPath;
+        cache.toString(), cacheStatus.localizedLoadPath));
+    return cacheStatus.localizedLoadPath;
   }
   
   private void createSymlink(Configuration conf, URI cache,
@@ -384,7 +392,7 @@
     File flink = new File(link);
     if (doSymlink){
       if (!flink.exists()) {
-        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+        FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
       }
     }
   }
@@ -395,21 +403,21 @@
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
                                     FileStatus fileStatus,
-                                    boolean isArchive)
+                                    boolean isArchive, boolean isPublic)
   throws IOException {
     FileSystem fs = FileSystem.get(cache, conf);
     FileSystem localFs = FileSystem.getLocal(conf);
     Path parchive = null;
     if (isArchive) {
-      parchive = new Path(cacheStatus.localLoadPath,
-        new Path(cacheStatus.localLoadPath.getName()));
+      parchive = new Path(cacheStatus.localizedLoadPath,
+        new Path(cacheStatus.localizedLoadPath.getName()));
     } else {
-      parchive = cacheStatus.localLoadPath;
+      parchive = cacheStatus.localizedLoadPath;
     }
 
     if (!localFs.mkdirs(parchive.getParent())) {
       throw new IOException("Mkdirs failed to create directory " +
-          cacheStatus.localLoadPath.toString());
+          cacheStatus.localizedLoadPath.toString());
     }
 
     String cacheId = cache.getPath();
@@ -439,29 +447,45 @@
       FileUtil.getDU(new File(parchive.getParent().toString()));
     cacheStatus.size = cacheSize;
     synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
       if( dirSize == null ) {
         dirSize = Long.valueOf(cacheSize);
       } else {
         dirSize += cacheSize;
       }
-      baseDirSize.put(cacheStatus.baseDir, dirSize);
+      baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
     }
 
-    // do chmod here
-    try {
-      //Setting recursive permission to grant everyone read and execute
-      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-    } catch(InterruptedException e) {
-      LOG.warn("Exception in chmod" + e.toString());
-    }
+    // set proper permissions for the localized directory
+    setPermissions(conf, cacheStatus, isPublic);
 
     // update cacheStatus to reflect the newly cached file
     cacheStatus.mtime = getTimestamp(conf, cache);
 
     LOG.info(String.format("Cached %s as %s",
-             cache.toString(), cacheStatus.localLoadPath));
-    return cacheStatus.localLoadPath;
+             cache.toString(), cacheStatus.localizedLoadPath));
+    return cacheStatus.localizedLoadPath;
+  }
+
+  private void setPermissions(Configuration conf, CacheStatus cacheStatus,
+      boolean isPublic) throws IOException {
+    if (isPublic) {
+      Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
+      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+      try {
+        FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
+      } catch (InterruptedException e) {
+        LOG.warn("Exception in chmod" + e.toString());
+        throw new IOException(e);
+      }
+    } else {
+      // invoke taskcontroller to set permissions
+      DistributedCacheFileContext context = new DistributedCacheFileContext(
+          conf.get(JobContext.USER_NAME), new File(cacheStatus.localizedBaseDir
+              .toString()), cacheStatus.localizedBaseDir,
+          cacheStatus.uniqueString);
+      taskController.initializeDistributedCacheFile(context);
+    }
   }
 
   private static boolean isTarFile(String filename) {
@@ -532,10 +556,10 @@
 
   static class CacheStatus {
     // the local load path of this cache
-    Path localLoadPath;
+    Path localizedLoadPath;
 
     //the base dir where the cache lies
-    Path baseDir;
+    Path localizedBaseDir;
 
     //the size of this cache
     long size;
@@ -548,18 +572,28 @@
 
     // is it initialized ?
     boolean inited = false;
+
+    // The sub directory (tasktracker/archive or tasktracker/user/archive),
+    // under which the file will be localized
+    Path subDir;
     
-    public CacheStatus(Path baseDir, Path localLoadPath) {
+    // unique string used in the construction of local load path
+    String uniqueString;
+
+    public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
+        String uniqueString) {
       super();
-      this.localLoadPath = localLoadPath;
+      this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
-      this.baseDir = baseDir;
+      this.localizedBaseDir = baseDir;
       this.size = 0;
+      this.subDir = subDir;
+      this.uniqueString = uniqueString;
     }
     
     Path getBaseDir(){
-      return this.baseDir;
+      return this.localizedBaseDir;
     }
     
     // mark it as initialized
@@ -571,6 +605,10 @@
     boolean isInited() {
       return inited;
     }
+    
+    Path getLocalizedUniqueDir() {
+      return new Path(localizedBaseDir, new Path(subDir, uniqueString));
+    }
   }
 
   /**
@@ -582,7 +620,7 @@
     synchronized (cachedArchives) {
       for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
         try {
-          localFs.delete(f.getValue().localLoadPath, true);
+          localFs.delete(f.getValue().localizedLoadPath, true);
         } catch (IOException ie) {
           LOG.debug("Error cleaning up cache", ie);
         }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Thu Jan  7 07:59:44 2010
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
 import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
@@ -36,7 +37,6 @@
     TestTrackerDistributedCacheManager {
 
   private File configFile;
-  private MyLinuxTaskController taskController;
   private String taskTrackerSpecialGroup;
 
   private static final Log LOG =
@@ -65,7 +65,7 @@
         ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
             .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
     String execPath = path + "/task-controller";
-    taskController.setTaskControllerExe(execPath);
+    ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
     taskController.setup();
 
@@ -74,6 +74,17 @@
   }
 
   @Override
+  protected void refreshConf(Configuration conf) throws IOException {
+    super.refreshConf(conf);
+    String path =
+      System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+      ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
+          .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+   
+  }
+
+  @Override
   protected void tearDown()
       throws IOException {
     if (!ClusterWithLinuxTaskController.shouldRun()) {
@@ -99,27 +110,19 @@
   }
 
   @Override
-  protected TaskController getTaskController() {
-    return taskController;
-  }
-
-  @Override
   protected void checkFilePermissions(Path[] localCacheFiles)
       throws IOException {
-    String cachedFirstFile = localCacheFiles[0].toUri().getPath();
-    String cachedSecondFile = localCacheFiles[1].toUri().getPath();
     String userName = getJobOwnerName();
 
-    // First make sure that the cache files have proper permissions.
-    TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile,
-        "-r-xrwx---", userName, taskTrackerSpecialGroup);
-    TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile,
-        "-r-xrwx---", userName, taskTrackerSpecialGroup);
-
-    // Now. make sure that all the path components also have proper
-    // permissions.
-    checkPermissionOnPathComponents(cachedFirstFile, userName);
-    checkPermissionOnPathComponents(cachedSecondFile, userName);
+    for (Path p : localCacheFiles) {
+      // First make sure that the cache file has proper permissions.
+      TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
+          "-r-xrwx---", userName, taskTrackerSpecialGroup);
+      // Now. make sure that all the path components also have proper
+      // permissions.
+      checkPermissionOnPathComponents(p.toUri().getPath(), userName);
+    }
+
   }
 
   /**
@@ -136,7 +139,7 @@
             + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
             + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
             "");
-    LOG.info("Leading path for cacheFirstFile is : "
+    LOG.info("Trailing path for cacheFirstFile is : "
         + trailingStringForFirstFile);
     // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string.
     String leadingStringForFirstFile =

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=896781&r1=896780&r2=896781&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jan  7 07:59:44 2010
@@ -32,12 +32,12 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,10 +45,13 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.mortbay.log.Log;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
@@ -59,7 +62,6 @@
           .getAbsolutePath();
 
   protected File ROOT_MAPRED_LOCAL_DIR;
-  private static String TEST_CACHE_BASE_DIR = "cachebasedir";
   protected int numLocalDirs = 6;
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -70,7 +72,8 @@
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
-    new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+  protected TaskController taskController;
 
   @Override
   protected void setUp() throws IOException,InterruptedException {
@@ -85,11 +88,25 @@
     ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
     ROOT_MAPRED_LOCAL_DIR.mkdirs();
 
+    String []localDirs = new String[numLocalDirs];
+    for (int i = 0; i < numLocalDirs; i++) {
+      File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i);
+      localDirs[i] = localDir.getPath();
+      localDir.mkdir();
+    }
+
     conf = new Configuration();
-    conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
-    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString());
+    conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     fs = FileSystem.get(conf);
+    Class<? extends TaskController> taskControllerClass = conf.getClass(
+        TTConfig.TT_TASK_CONTROLLER, DefaultTaskController.class,
+        TaskController.class);
+    taskController = (TaskController) ReflectionUtils.newInstance(
+        taskControllerClass, conf);
+
+    // setup permissions for mapred local dir
+    taskController.setup();
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -97,6 +114,11 @@
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
   }
+  
+  protected void refreshConf(Configuration conf) throws IOException {
+    taskController.setConf(conf);
+    taskController.setup();
+  }
 
   /**
    * Whether the test can run on the machine
@@ -121,6 +143,8 @@
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(conf);
+    String userName = getJobOwnerName();
+    subConf.set(JobContext.USER_NAME, userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
@@ -132,11 +156,9 @@
     subConf.writeXml(os);
     os.close();
 
-    String userName = getJobOwnerName();
-
     // ****** Imitate TaskRunner code.
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
@@ -144,11 +166,6 @@
     handle.setup(localDirAllocator, workDir, TaskTracker
         .getPrivateDistributedCacheDir(userName), 
         TaskTracker.getPublicDistributedCacheDir());
-
-    InitializationContext context = new InitializationContext();
-    context.user = userName;
-    context.workDir = workDir;
-    getTaskController().initializeDistributedCache(context);
     // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -178,18 +195,18 @@
       TrackerDistributedCacheManager {
     public FakeTrackerDistributedCacheManager(Configuration conf)
         throws IOException {
-      super(conf);
+      super(conf, taskController);
     }
 
     @Override
     Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
-        throws IOException {
+        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
+        boolean isPublic) throws IOException {
       if (cache.equals(firstCacheFile.toUri())) {
         throw new IOException("fake fail");
       }
       return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          fileStatus, isArchive);
+          fileStatus, isArchive, isPublic);
     }
   }
 
@@ -198,8 +215,6 @@
     if (!canRun()) {
       return;
     }
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     TrackerDistributedCacheManager manager = 
       new FakeTrackerDistributedCacheManager(conf);
     Cluster cluster = new Cluster(conf);
@@ -208,6 +223,7 @@
 
     // Configures a job with a regular file
     Job job1 = Job.getInstance(cluster, conf);
+    job1.setUser(userName);
     job1.addCacheFile(secondCacheFile.toUri());
     Configuration conf1 = job1.getConfiguration();
     TrackerDistributedCacheManager.determineTimestamps(conf1);
@@ -229,6 +245,7 @@
     
     // Configures another job with three regular files.
     Job job2 = Job.getInstance(cluster, conf);
+    job2.setUser(userName);
     // add a file that would get failed to localize
     job2.addCacheFile(firstCacheFile.toUri());
     // add a file that is already localized by different job
@@ -286,7 +303,7 @@
   private void checkLocalizedPath(String visibility) 
   throws IOException, LoginException, InterruptedException {
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     Cluster cluster = new Cluster(conf);
     String userName = getJobOwnerName();
     File workDir = new File(TEST_ROOT_DIR, "workdir");
@@ -298,6 +315,7 @@
     }
     
     Job job1 = Job.getInstance(cluster, conf);
+    job1.setUser(userName);
     job1.addCacheFile(cacheFile.toUri());
     Configuration conf1 = job1.getConfiguration();
     TrackerDistributedCacheManager.determineTimestamps(conf1);
@@ -319,12 +337,18 @@
     Path localizedPath =
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
           fs.getFileStatus(cacheFile), false,
-          c.timestamp, new Path(TEST_ROOT_DIR), false);
+          c.timestamp, new Path(TEST_ROOT_DIR), false,
+          Boolean.parseBoolean(visibility));
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
         ", but was localized at " + 
         localizedPath, localizedPath.toString().contains(distCacheDir));
+    if ("true".equals(visibility)) {
+      checkPublicFilePermissions(new Path[]{localizedPath});
+    } else {
+      checkFilePermissions(new Path[]{localizedPath});
+    }
   }
   
   /**
@@ -335,17 +359,29 @@
    */
   protected void checkFilePermissions(Path[] localCacheFiles)
       throws IOException {
-    Path cachedFirstFile = localCacheFiles[0];
-    Path cachedSecondFile = localCacheFiles[1];
-    // Both the files should have executable permissions on them.
-    assertTrue("First cache file is not executable!", new File(cachedFirstFile
-        .toUri().getPath()).canExecute());
-    assertTrue("Second cache file is not executable!", new File(
-        cachedSecondFile.toUri().getPath()).canExecute());
+    // All the files should have executable permissions on them.
+    for (Path p : localCacheFiles) {
+      assertTrue("Cache file is not executable!", new File(p
+          .toUri().getPath()).canExecute());
+    }
   }
 
-  protected TaskController getTaskController() {
-    return new DefaultTaskController();
+  /**
+   * Check permissions on the public cache files
+   * 
+   * @param localCacheFiles
+   * @throws IOException
+   */
+  private void checkPublicFilePermissions(Path[] localCacheFiles)
+      throws IOException {
+    // All the files should have read and executable permissions for others
+    for (Path p : localCacheFiles) {
+      FsPermission perm = fs.getFileStatus(p).getPermission();
+      assertTrue("cache file is not readable by others", perm.getOtherAction()
+          .implies(FsAction.READ));
+      assertTrue("cache file is not executable by others", perm
+          .getOtherAction().implies(FsAction.EXECUTE));
+    }
   }
 
   protected String getJobOwnerName() throws LoginException {
@@ -358,27 +394,39 @@
     if (!canRun()) {
       return;
     }
+    // This test needs MRConfig.LOCAL_DIR to be single directory
+    // instead of four, because it assumes that both 
+    // firstcachefile and secondcachefile will be localized on same directory 
+    // so that second localization triggers deleteCache.
+    // If MRConfig.LOCAL_DIR is four directories, second localization might not 
+    // trigger deleteCache, if it is localized in different directory.
+    Configuration conf2 = new Configuration(conf);
+    conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
+    refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf);
-    FileSystem localfs = FileSystem.getLocal(conf);
+        new TrackerDistributedCacheManager(conf2, taskController);
+    FileSystem localfs = FileSystem.getLocal(conf2);
     long now = System.currentTimeMillis();
+    String userName = getJobOwnerName();
+    conf2.set(JobContext.USER_NAME, userName);
 
-    manager.getLocalCache(firstCacheFile.toUri(), conf, 
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false);
-    manager.releaseCache(firstCacheFile.toUri(), conf, now);
+    Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    manager.releaseCache(firstCacheFile.toUri(), conf2, now);
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
-    manager.getLocalCache(secondCacheFile.toUri(), conf, 
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
-    FileStatus[] dirStatuses = localfs.listStatus(
-      new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
-    assertTrue("DistributedCache failed deleting old" + 
+    manager.getLocalCache(secondCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(secondCacheFile), false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+    assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
-        dirStatuses.length == 1);
+        localfs.exists(localCache));
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
@@ -386,14 +434,17 @@
       return;
     }
     TrackerDistributedCacheManager manager =
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    String userName = getJobOwnerName();
+    conf.set(JobContext.USER_NAME, userName);
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
         System.currentTimeMillis(),
-        new Path(TEST_ROOT_DIR), false);
+        new Path(TEST_ROOT_DIR), false, false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -459,20 +510,21 @@
       return;
     }
     Configuration myConf = new Configuration(conf);
-    myConf.set("fs.default.name", "refresh:///");
+    myConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "refresh:///");
     myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+    String userName = getJobOwnerName();
+
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(myConf);
+      new TrackerDistributedCacheManager(myConf, taskController);
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(myConf);
+    subConf.set(JobContext.USER_NAME, userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
-    String userName = getJobOwnerName();
-
     // ****** Imitate TaskRunner code.
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
@@ -513,6 +565,7 @@
     
     // submit another job
     Configuration subConf2 = new Configuration(myConf);
+    subConf2.set(JobContext.USER_NAME, userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
     TrackerDistributedCacheManager.determineTimestamps(subConf2);
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
@@ -536,4 +589,46 @@
     handle.release();
   }
 
+  /**
+   * Localize a file. After localization is complete, create a file, "myFile",
+   * under the directory where the file is localized and ensure that it has
+   * permissions different from what is set by default. Then, localize another
+   * file. Verify that "myFile" has the right permissions.
+   * @throws Exception
+   */
+  public void testCustomPermissions() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    String userName = getJobOwnerName();
+    conf.set(JobContext.USER_NAME, userName);
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf, taskController);
+    FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
+
+    Path[] localCache = new Path[2];
+    localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    FsPermission myPermission = new FsPermission((short)0600);
+    Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
+    if (FileSystem.create(localfs, myFile, myPermission) == null) {
+      throw new IOException("Could not create " + myFile);
+    }
+    try {
+      localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(secondCacheFile), false, 
+          System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+      FileStatus stat = localfs.getFileStatus(myFile);
+      assertTrue(stat.getPermission().equals(myPermission));
+      // validate permissions of localized files.
+      checkFilePermissions(localCache);
+    } finally {
+      localfs.delete(myFile, false);
+    }
+  }
+
 }



Mime
View raw message