hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077129 - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ mapred/org/apache/hadoop/filecache/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/filecache/ test/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 03:43:57 GMT
Author: omalley
Date: Fri Mar  4 03:43:57 2011
New Revision: 1077129

URL: http://svn.apache.org/viewvc?rev=1077129&view=rev
Log:
commit 395429ee28f7bb21626c792df83811c4310f9b6c
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Wed Jan 27 21:58:11 2010 +0530

    MAPREDUCE:1186 from https://issues.apache.org/jira/secure/attachment/12431573/1186.20S-6.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1186. Modified code in distributed cache to set
    +    permissions only on required set of localized paths.
    +    (Amareshwari Sriramadasu via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar  4 03:43:57 2011
@@ -51,6 +51,7 @@ int main(int argc, char **argv) {
   const char * task_id = NULL;
   const char * tt_root = NULL;
   const char *log_dir = NULL;
+  const char * unique_string = NULL;
   int exit_code = 0;
   const char * task_pid = NULL;
   const char* const short_options = "l:";
@@ -113,8 +114,11 @@ int main(int argc, char **argv) {
     job_id = argv[optind++];
     exit_code = initialize_job(job_id, user_detail->pw_name);
     break;
-  case INITIALIZE_DISTRIBUTEDCACHE:
-    exit_code = initialize_distributed_cache(user_detail->pw_name);
+  case INITIALIZE_DISTRIBUTEDCACHE_FILE:
+    tt_root = argv[optind++];
+    unique_string = argv[optind++];
+    exit_code = initialize_distributed_cache_file(tt_root, unique_string,
+        user_detail->pw_name);
     break;
   case LAUNCH_TASK_JVM:
     tt_root = argv[optind++];

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 03:43:57 2011
@@ -172,9 +172,10 @@ char *get_user_directory(const char *tt_
 /**
  * Get the distributed cache directory for a particular user
  */
-char *get_distributed_cache_directory(const char *tt_root, const char *user) {
-  return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2,
-      tt_root, user);
+char *get_distributed_cache_directory(const char *tt_root, const char *user,
+    const char* unique_string) {
+  return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, 
+      "dist_cache_unique_path", 3, tt_root, user, unique_string);
 }
 
 char *get_job_work_directory(const char *job_dir) {
@@ -795,22 +796,25 @@ int initialize_job(const char *jobid, co
 }
 
 /**
- * Function to initialize the distributed cache files of a user.
+ * Function to initialize the distributed cache file for a user.
  * It does the following:
- *     *  sudo chown user:mapred -R taskTracker/$user/distcache
- *     *  sudo chmod 2570 -R taskTracker/$user/distcache
- * This is done once per every JVM launch. Tasks reusing JVMs just create
+ *     *  sudo chown user:mapred -R taskTracker/$user/distcache/<randomdir>
+ *     *  sudo chmod 2570 -R taskTracker/$user/distcache/<randomdir>
+ * This is done once per localization. Tasks reusing JVMs just create
  * symbolic links themselves and so there isn't anything specific to do in
  * that case.
- * Sometimes, it happens that a task uses the whole or part of a directory
- * structure in taskTracker/$user/distcache. In this case, some paths are
- * already set proper private permissions by this same function called during
- * a previous JVM launch. In the current invocation, we only do the
- * chown/chmod operation of files/directories that are newly created by the
- * TaskTracker (i.e. those that still are not owned by user:mapred)
  */
-int initialize_distributed_cache(const char *user) {
-
+int initialize_distributed_cache_file(const char *tt_root, 
+    const char *unique_string, const char *user) {
+  if (tt_root == NULL) {
+    fprintf(LOGFILE, "tt_root passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+  if (unique_string == NULL) {
+    fprintf(LOGFILE, "unique_string passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+ 
   if (user == NULL) {
     fprintf(LOGFILE, "user passed is null.\n");
     return INVALID_ARGUMENT_NUMBER;
@@ -820,69 +824,41 @@ int initialize_distributed_cache(const c
     fprintf(LOGFILE, "Couldn't get the user details of %s", user);
     return INVALID_USER_NAME;
   }
-
-  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-
-  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
-  if (local_dir == NULL) {
-    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+  //Check tt_root
+  if (check_tt_root(tt_root) < 0) {
+    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
     cleanup();
     return INVALID_TT_ROOT;
   }
 
-  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
-#ifdef DEBUG
-  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
-      full_local_dir_str);
-#endif
+  // set permission on the unique directory
+  char *localized_unique_dir = get_distributed_cache_directory(tt_root, user,
+      unique_string);
+  if (localized_unique_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get unique distcache directory for %s.\n", user);
+    cleanup();
+    return INITIALIZE_DISTCACHEFILE_FAILED;
+  }
 
-  char *distcache_dir;
-  char **local_dir_ptr = local_dir;
+  gid_t binary_gid = getegid(); // the group permissions of the binary.
   int failed = 0;
-  while (*local_dir_ptr != NULL) {
-    distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user);
-    if (distcache_dir == NULL) {
-      fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user);
-      failed = 1;
-      break;
-    }
-
-    struct stat filestat;
-    if (stat(distcache_dir, &filestat) != 0) {
-      if (errno == ENOENT) {
-#ifdef DEBUG
-        fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n",
-            distcache_dir);
-#endif
-      } else {
-        // stat failed because of something else!
-        fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n",
-            distcache_dir);
-        failed = 1;
-        free(distcache_dir);
-        break;
-      }
-    } else if (secure_path(distcache_dir, user_detail->pw_uid,
-        tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
+  struct stat filestat;
+  if (stat(localized_unique_dir, &filestat) != 0) {
+    // stat on distcache failed because of something
+    fprintf(LOGFILE, "Failed to stat the localized_unique_dir %s\n",
+        localized_unique_dir);
+    failed = INITIALIZE_DISTCACHEFILE_FAILED;
+  } else if (secure_path(localized_unique_dir, user_detail->pw_uid,
+        binary_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
             | S_IXUSR | S_IRWXG, 1) != 0) {
-      // No setgid on files and setgid on dirs, 570
-      fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
-          distcache_dir);
-      failed = 1;
-      free(distcache_dir);
-      break;
-    }
-
-    local_dir_ptr++;
-    free(distcache_dir);
+    // No setgid on files and setgid on dirs, 570
+    fprintf(LOGFILE, "Failed to secure the localized_unique_dir %s\n",
+        localized_unique_dir);
+    failed = INITIALIZE_DISTCACHEFILE_FAILED;
   }
-  free(local_dir);
-  free(full_local_dir_str);
+  free(localized_unique_dir);
   cleanup();
-  if (failed) {
-    return INITIALIZE_DISTCACHE_FAILED;
-  }
-  return 0;
+  return failed;
 }
 
 /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar  4 03:43:57 2011
@@ -39,7 +39,7 @@
 enum command {
   INITIALIZE_USER,
   INITIALIZE_JOB,
-  INITIALIZE_DISTRIBUTEDCACHE,
+  INITIALIZE_DISTRIBUTEDCACHE_FILE,
   LAUNCH_TASK_JVM,
   INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
@@ -66,7 +66,7 @@ enum errorcodes {
   PREPARE_TASK_LOGS_FAILED, //16
   INVALID_TT_LOG_DIR, //17
   OUT_OF_MEMORY, //18
-  INITIALIZE_DISTCACHE_FAILED, //19
+  INITIALIZE_DISTCACHEFILE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
   UNABLE_TO_BUILD_PATH //21
 };
@@ -75,7 +75,7 @@ enum errorcodes {
 
 #define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
 
-#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache"
+#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache/%s"
 
 #define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
 
@@ -109,7 +109,8 @@ int initialize_task(const char *jobid, c
 
 int initialize_job(const char *jobid, const char *user);
 
-int initialize_distributed_cache(const char *user);
+int initialize_distributed_cache_file(const char *tt_root, 
+    const char* unique_string, const char *user);
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar  4 03:43:57 2011
@@ -24,6 +24,9 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 
 import java.net.URI;
 
@@ -196,9 +199,9 @@ public class DistributedCache {
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
-        baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
-        honorSymLinkConf);
+    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
+            confFileStamp, currentWorkDir, honorSymLinkConf, false);
   }
 
   /**
@@ -275,8 +278,8 @@ public class DistributedCache {
     if (timestamp == null) {
       throw new IOException("TimeStamp of the uri couldnot be found");
     }
-    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
-          Long.parseLong(timestamp));
+    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .releaseCache(cache, conf, Long.parseLong(timestamp));
   }
   
   /**
@@ -289,10 +292,11 @@ public class DistributedCache {
    * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
    * instead.
    */
+  @Deprecated
   public static String makeRelative(URI cache, Configuration conf)
       throws IOException {
-    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
-
+    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .makeRelative(cache, conf);
   }
 
   /**
@@ -656,6 +660,7 @@ public class DistributedCache {
    * instead.
    */
   public static void purgeCache(Configuration conf) throws IOException {
-    new TrackerDistributedCacheManager(conf).purgeCache();
+    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .purgeCache();
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Fri Mar  4 03:43:57 2011
@@ -172,7 +172,7 @@ public class TaskDistributedCacheManager
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
           cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
-          cacheFile.timestamp, workdirPath, false);
+          cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
       cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 03:43:57 2011
@@ -30,6 +30,8 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -67,14 +69,18 @@ public class TrackerDistributedCacheMana
   
   private LocalDirAllocator lDirAllocator;
   
+  private TaskController taskController;
+  
   private Configuration trackerConf;
   
   private Random random = new Random();
 
-  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+  public TrackerDistributedCacheManager(Configuration conf,
+      TaskController taskController) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    this.taskController = taskController;
   }
 
   /**
@@ -102,6 +108,7 @@ public class TrackerDistributedCacheMana
    * launches
    * NOTE: This is effectively always on since r696957, since there is no code
    * path that does not use this.
+   * @param isPublic to know the cache file is accessible to public or private
    * @return the path to directory where the archives are unjarred in case of
    * archives, the path to the file where the file is copied locally
    * @throws IOException
@@ -109,7 +116,7 @@ public class TrackerDistributedCacheMana
   Path getLocalCache(URI cache, Configuration conf,
       String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf)
+      Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
       throws IOException {
     String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
@@ -118,13 +125,13 @@ public class TrackerDistributedCacheMana
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
         // was never localized
+        String uniqueString = String.valueOf(random.nextLong());
         String cachePath = new Path (subDir, 
-          new Path(String.valueOf(random.nextLong()),
-            makeRelative(cache, conf))).toString();
+          new Path(uniqueString, makeRelative(cache, conf))).toString();
         Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
           fileStatus.getLen(), trackerConf);
-        lcacheStatus = new CacheStatus(
-          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
+          cachePath, "")), localPath, new Path(subDir), uniqueString);
         cachedArchives.put(key, lcacheStatus);
       }
 
@@ -138,7 +145,7 @@ public class TrackerDistributedCacheMana
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
           localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, fileStatus, isArchive);
+              lcacheStatus, fileStatus, isArchive, isPublic);
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -242,17 +249,17 @@ public class TrackerDistributedCacheMana
     // do the deletion, after releasing the global lock
     for (CacheStatus lcacheStatus : deleteSet) {
       synchronized (lcacheStatus) {
-        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
+        LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
         // decrement the size of the cache from baseDirSize
         synchronized (baseDirSize) {
-          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
           if ( dirSize != null ) {
             dirSize -= lcacheStatus.size;
-            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+            baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
           } else {
             LOG.warn("Cannot find record of the baseDir: " + 
-                     lcacheStatus.baseDir + " during delete!");
+                     lcacheStatus.localizedBaseDir + " during delete!");
           }
         }
       }
@@ -295,12 +302,12 @@ public class TrackerDistributedCacheMana
     // Has to be
     if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
         cacheStatus, fileStatus)) {
-      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
-          " for cache-file: " + cache);
+      throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath + 
+                            " for cache-file: " + cache);
     }
     LOG.info(String.format("Using existing cache of %s->%s",
-             cache.toString(), cacheStatus.localLoadPath));
-    return cacheStatus.localLoadPath;
+             cache.toString(), cacheStatus.localizedLoadPath));
+    return cacheStatus.localizedLoadPath;
   }
   
   /**
@@ -361,7 +368,7 @@ public class TrackerDistributedCacheMana
     File flink = new File(link);
     if (doSymlink){
       if (!flink.exists()) {
-        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+        FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
       }
     }
   }
@@ -372,21 +379,23 @@ public class TrackerDistributedCacheMana
                                       URI cache, long confFileStamp,
                                       CacheStatus cacheStatus,
                                       FileStatus fileStatus,
-                                      boolean isArchive)
+                                      boolean isArchive, boolean isPublic)
       throws IOException {
     FileSystem fs = FileSystem.get(cache, conf);
     FileSystem localFs = FileSystem.getLocal(conf);
     Path parchive = null;
     if (isArchive) {
-      parchive = new Path(cacheStatus.localLoadPath,
-        new Path(cacheStatus.localLoadPath.getName()));
-     } else {
-      parchive = cacheStatus.localLoadPath;
+      parchive = new Path(cacheStatus.localizedLoadPath,
+        new Path(cacheStatus.localizedLoadPath.getName()));
+    } else {
+      parchive = cacheStatus.localizedLoadPath;
     }
+
     if (!localFs.mkdirs(parchive.getParent())) {
       throw new IOException("Mkdirs failed to create directory " +
-          cacheStatus.localLoadPath.toString());
+          cacheStatus.localizedLoadPath.toString());
     }
+
     String cacheId = cache.getPath();
     fs.copyToLocalFile(new Path(cacheId), parchive);
     if (isArchive) {
@@ -413,27 +422,45 @@ public class TrackerDistributedCacheMana
       FileUtil.getDU(new File(parchive.getParent().toString()));
     cacheStatus.size = cacheSize;
     synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
       if( dirSize == null ) {
         dirSize = Long.valueOf(cacheSize);
       } else {
         dirSize += cacheSize;
       }
-      baseDirSize.put(cacheStatus.baseDir, dirSize);
-    }
-    // do chmod here
-    try {
-      //Setting recursive permission to grant everyone read and execute
-      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-    } catch(InterruptedException e) {
-      LOG.warn("Exception in chmod" + e.toString());
+      baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
     }
+
+    // set proper permissions for the localized directory
+    setPermissions(conf, cacheStatus, isPublic);
+
     // update cacheStatus to reflect the newly cached file
     cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
 
     LOG.info(String.format("Cached %s as %s",
-        cache.toString(), cacheStatus.localLoadPath));
-    return cacheStatus.localLoadPath;
+             cache.toString(), cacheStatus.localizedLoadPath));
+    return cacheStatus.localizedLoadPath;
+  }
+
+  private void setPermissions(Configuration conf, CacheStatus cacheStatus,
+      boolean isPublic) throws IOException {
+    if (isPublic) {
+      Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
+      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+      try {
+        FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
+      } catch (InterruptedException e) {
+        LOG.warn("Exception in chmod" + e.toString());
+        throw new IOException(e);
+      }
+    } else {
+      // invoke taskcontroller to set permissions
+      DistributedCacheFileContext context = new DistributedCacheFileContext(
+          conf.get("user.name"), new File(cacheStatus.localizedBaseDir
+              .toString()), cacheStatus.localizedBaseDir,
+          cacheStatus.uniqueString);
+      taskController.initializeDistributedCacheFile(context);
+    }
   }
 
   private static boolean isTarFile(String filename) {
@@ -508,10 +535,10 @@ public class TrackerDistributedCacheMana
 
   static class CacheStatus {
     // the local load path of this cache
-    Path localLoadPath;
+    Path localizedLoadPath;
 
     //the base dir where the cache lies
-    Path baseDir;
+    Path localizedBaseDir;
 
     //the size of this cache
     long size;
@@ -524,18 +551,28 @@ public class TrackerDistributedCacheMana
 
     // is it initialized ?
     boolean inited = false;
+
+    // The sub directory (tasktracker/archive or tasktracker/user/archive),
+    // under which the file will be localized
+    Path subDir;
     
-    public CacheStatus(Path baseDir, Path localLoadPath) {
+    // unique string used in the construction of local load path
+    String uniqueString;
+
+    public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
+        String uniqueString) {
       super();
-      this.localLoadPath = localLoadPath;
+      this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
-      this.baseDir = baseDir;
+      this.localizedBaseDir = baseDir;
       this.size = 0;
+      this.subDir = subDir;
+      this.uniqueString = uniqueString;
     }
     
     Path getBaseDir(){
-      return this.baseDir;
+      return this.localizedBaseDir;
     }
     
     // mark it as initialized
@@ -547,6 +584,10 @@ public class TrackerDistributedCacheMana
     boolean isInited() {
       return inited;
     }
+    
+    Path getLocalizedUniqueDir() {
+      return new Path(localizedBaseDir, new Path(subDir, uniqueString));
+    }
   }
 
   /**
@@ -558,7 +599,7 @@ public class TrackerDistributedCacheMana
     synchronized (cachedArchives) {
       for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
         try {
-          localFs.delete(f.getValue().localLoadPath, true);
+          localFs.delete(f.getValue().localizedLoadPath, true);
         } catch (IOException ie) {
           LOG.debug("Error cleaning up cache", ie);
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 03:43:57 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.util.Shell.Shel
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 
 /**
  * The default implementation for controlling tasks.
@@ -153,8 +154,17 @@ public class DefaultTaskController exten
   }
 
   @Override
-  public void initializeDistributedCache(InitializationContext context) {
-    // Do nothing.
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+      throws IOException {
+    Path localizedUniqueDir = context.getLocalizedUniqueDir();
+    try {
+      // Setting recursive execute permission on localized dir
+      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+      FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
+    } catch (InterruptedException ie) {
+      LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
+      throw new IOException(ie);
+    }
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 03:43:57 2011
@@ -85,7 +85,7 @@ class LinuxTaskController extends TaskCo
   enum TaskCommands {
     INITIALIZE_USER,
     INITIALIZE_JOB,
-    INITIALIZE_DISTRIBUTEDCACHE,
+    INITIALIZE_DISTRIBUTEDCACHE_FILE,
     LAUNCH_TASK_JVM,
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
@@ -342,12 +342,21 @@ class LinuxTaskController extends TaskCo
   }
 
   @Override
-  public void initializeDistributedCache(InitializationContext context)
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
       throws IOException {
-    LOG.debug("Going to initialize distributed cache for " + context.user
-        + " on the TT");
-    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user,
-        new ArrayList<String>(), context.workDir, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to initialize distributed cache for " + context.user
+          + " with localizedBaseDir " + context.localizedBaseDir + 
+          " and uniqueString " + context.uniqueString);
+    }
+    List<String> args = new ArrayList<String>();
+    // Here, uniqueString might start with '-'. Adding -- in front of the 
+    // arguments indicates that they are non-option parameters.
+    args.add("--");
+    args.add(context.localizedBaseDir.toString());
+    args.add(context.uniqueString);
+    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
+        args, context.workDir, null);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:43:57 2011
@@ -112,7 +112,7 @@ class LocalJobRunner implements JobSubmi
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
       this.trackerDistributerdCacheManager =
-        new TrackerDistributedCacheManager(conf);
+        new TrackerDistributedCacheManager(conf, new DefaultTaskController());
       this.taskDistributedCacheManager =
         trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 03:43:57 2011
@@ -70,12 +70,10 @@ public abstract class TaskController imp
    * disks:
    * <ul>
    * <li>mapred-local directories</li>
-   * <li>Job cache directories</li>
-   * <li>Archive directories</li>
    * <li>Hadoop log directories</li>
    * </ul>
    */
-  void setup() {
+  public void setup() {
     for (String localDir : this.mapredLocalDirs) {
       // Set up the mapred-local directories.
       File mapredlocalDir = new File(localDir);
@@ -109,13 +107,13 @@ public abstract class TaskController imp
 
   /**
    * Take task-controller specific actions to initialize the distributed cache
-   * files. This involves setting appropriate permissions for these files so as
+   * file. This involves setting appropriate permissions for these files so as
    * to secure them to be accessible only their owners.
    * 
    * @param context
    * @throws IOException
    */
-  public abstract void initializeDistributedCache(InitializationContext context)
+  public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
       throws IOException;
 
   /**
@@ -257,6 +255,37 @@ public abstract class TaskController imp
   public static class InitializationContext {
     public File workDir;
     public String user;
+    
+    public InitializationContext() {
+    }
+    
+    public InitializationContext(String user, File workDir) {
+      this.user = user;
+      this.workDir = workDir;
+    }
+  }
+  
+  /**
+   * This is used for initializing the private localized files in distributed
+   * cache. Initialization would involve changing permission, ownership and etc.
+   */
+  public static class DistributedCacheFileContext extends InitializationContext {
+    // base directory under which file has been localized
+    Path localizedBaseDir;
+    // the unique string used to construct the localized path
+    String uniqueString;
+
+    public DistributedCacheFileContext(String user, File workDir,
+        Path localizedBaseDir, String uniqueString) {
+      super(user, workDir);
+      this.localizedBaseDir = localizedBaseDir;
+      this.uniqueString = uniqueString;
+    }
+
+    public Path getLocalizedUniqueDir() {
+      return new Path(localizedBaseDir, new Path(TaskTracker
+          .getPrivateDistributedCacheDir(user), uniqueString));
+    }
   }
 
   static class JobInitializationContext extends InitializationContext {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:43:57 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -169,11 +168,6 @@ abstract class TaskRunner extends Thread
       // of files should happen in the TaskTracker's process space. Any changes to
       // the conf object after this will NOT be reflected to the child.
       setupChildTaskConfiguration(lDirAlloc);
-      
-      InitializationContext context = new InitializationContext();
-      context.user = conf.getUser();
-      context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
-      tracker.getTaskController().initializeDistributedCache(context);
 
       if (!prepare()) {
         return;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:43:57 2011
@@ -648,12 +648,17 @@ public class TaskTracker 
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Initialize DistributedCache and
-    // clear out temporary files that might be lying around
-    this.distributedCacheManager = 
-        new TrackerDistributedCacheManager(this.fConf);
-    this.distributedCacheManager.purgeCache();
-    cleanupStorage();
+    Class<? extends TaskController> taskControllerClass = fConf.getClass(
+        "mapred.task.tracker.task-controller", DefaultTaskController.class, TaskController.class);
+    taskController = (TaskController) ReflectionUtils.newInstance(
+        taskControllerClass, fConf);
+
+    // setup and create jobcache directory with appropriate permissions
+    taskController.setup();
+
+    // Initialize DistributedCache
+    this.distributedCacheManager = new TrackerDistributedCacheManager(
+        this.fConf, taskController);
 
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
@@ -681,15 +686,6 @@ public class TaskTracker 
     reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
     mapLauncher.start();
     reduceLauncher.start();
-    Class<? extends TaskController> taskControllerClass 
-                          = fConf.getClass("mapred.task.tracker.task-controller",
-                                            DefaultTaskController.class, 
-                                            TaskController.class); 
-    taskController = (TaskController)ReflectionUtils.newInstance(
-                                                      taskControllerClass, fConf);
-    
-    //setup and create jobcache directory with appropriate permissions
-    taskController.setup();
 
     // create a localizer instance
     setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar  4 03:43:57 2011
@@ -34,10 +34,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
@@ -46,10 +44,12 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.security.UserGroupInformation;
-
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
   private static final Log LOG =
@@ -61,7 +61,6 @@ public class TestTrackerDistributedCache
           .getAbsolutePath();
 
   protected File ROOT_MAPRED_LOCAL_DIR;
-  private static String TEST_CACHE_BASE_DIR = "cachebasedir";
   protected int numLocalDirs = 6;
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -72,7 +71,8 @@ public class TestTrackerDistributedCache
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
-    new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+    new LocalDirAllocator("mapred.local.dir");
+  protected TaskController taskController;
 
   @Override
   protected void setUp() throws IOException,InterruptedException {
@@ -87,12 +87,25 @@ public class TestTrackerDistributedCache
     ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
     ROOT_MAPRED_LOCAL_DIR.mkdirs();
 
+    String []localDirs = new String[numLocalDirs];
+    for (int i = 0; i < numLocalDirs; i++) {
+      File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i);
+      localDirs[i] = localDir.getPath();
+      localDir.mkdir();
+    }
+
     conf = new Configuration();
-    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
-    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
-             ROOT_MAPRED_LOCAL_DIR.toString());
+    conf.setStrings("mapred.local.dir", localDirs);
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     fs = FileSystem.get(conf);
+    Class<? extends TaskController> taskControllerClass = conf.getClass(
+        "mapred.task.tracker.task-controller", DefaultTaskController.class,
+        TaskController.class);
+    taskController = (TaskController) ReflectionUtils.newInstance(
+        taskControllerClass, conf);
+
+    // setup permissions for mapred local dir
+    taskController.setup();
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -100,6 +113,11 @@ public class TestTrackerDistributedCache
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
   }
+  
+  protected void refreshConf(Configuration conf) throws IOException {
+    taskController.setConf(conf);
+    taskController.setup();
+  }
 
   /**
    * Whether the test can run on the machine
@@ -124,6 +142,8 @@ public class TestTrackerDistributedCache
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(conf);
+    String userName = getJobOwnerName();
+    subConf.set("user.name", userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
@@ -135,11 +155,9 @@ public class TestTrackerDistributedCache
     subConf.writeXml(os);
     os.close();
 
-    String userName = getJobOwnerName();
-
     // ****** Imitate TaskRunner code.
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
@@ -147,11 +165,6 @@ public class TestTrackerDistributedCache
     handle.setup(localDirAllocator, workDir, TaskTracker
         .getPrivateDistributedCacheDir(userName), 
         TaskTracker.getPublicDistributedCacheDir());
-
-    InitializationContext context = new InitializationContext();
-    context.user = userName;
-    context.workDir = workDir;
-    getTaskController().initializeDistributedCache(context);
     // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -181,18 +194,18 @@ public class TestTrackerDistributedCache
       TrackerDistributedCacheManager {
     public FakeTrackerDistributedCacheManager(Configuration conf)
         throws IOException {
-      super(conf);
+      super(conf, taskController);
     }
 
     @Override
     Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
-        throws IOException {
+        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
+        boolean isPublic) throws IOException {
       if (cache.equals(firstCacheFile.toUri())) {
         throw new IOException("fake fail");
       }
       return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          fileStatus, isArchive);
+          fileStatus, isArchive, isPublic);
     }
   }
 
@@ -201,8 +214,6 @@ public class TestTrackerDistributedCache
     if (!canRun()) {
       return;
     }
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     TrackerDistributedCacheManager manager = 
       new FakeTrackerDistributedCacheManager(conf);
 
@@ -212,6 +223,7 @@ public class TestTrackerDistributedCache
     // Configures a job with a regular file
     Job job1 = new Job(conf);
     Configuration conf1 = job1.getConfiguration();
+    conf1.set("user.name", userName);
     DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
     
     TrackerDistributedCacheManager.determineTimestamps(conf1);
@@ -234,6 +246,7 @@ public class TestTrackerDistributedCache
     // Configures another job with three regular files.
     Job job2 = new Job(conf);
     Configuration conf2 = job2.getConfiguration();
+    conf2.set("user.name", userName);
     // add a file that would get failed to localize
     DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
     // add a file that is already localized by different job
@@ -291,7 +304,7 @@ public class TestTrackerDistributedCache
   private void checkLocalizedPath(String visibility) 
   throws IOException, LoginException, InterruptedException {
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     String userName = getJobOwnerName();
     File workDir = new File(TEST_ROOT_DIR, "workdir");
     Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
@@ -302,6 +315,7 @@ public class TestTrackerDistributedCache
     }
 
     Configuration conf1 = new Configuration(conf);
+    conf1.set("user.name", userName);
     DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
     TrackerDistributedCacheManager.determineTimestamps(conf1);
     TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
@@ -322,12 +336,18 @@ public class TestTrackerDistributedCache
     Path localizedPath =
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
           fs.getFileStatus(cacheFile), false,
-          c.timestamp, new Path(TEST_ROOT_DIR), false);
+          c.timestamp, new Path(TEST_ROOT_DIR), false,
+          Boolean.parseBoolean(visibility));
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
         ", but was localized at " + 
         localizedPath, localizedPath.toString().contains(distCacheDir));
+    if ("true".equals(visibility)) {
+      checkPublicFilePermissions(new Path[]{localizedPath});
+    } else {
+      checkFilePermissions(new Path[]{localizedPath});
+    }
   }
   
   /**
@@ -338,17 +358,29 @@ public class TestTrackerDistributedCache
    */
   protected void checkFilePermissions(Path[] localCacheFiles)
       throws IOException {
-    Path cachedFirstFile = localCacheFiles[0];
-    Path cachedSecondFile = localCacheFiles[1];
-    // Both the files should have executable permissions on them.
-    assertTrue("First cache file is not executable!", new File(cachedFirstFile
-        .toUri().getPath()).canExecute());
-    assertTrue("Second cache file is not executable!", new File(
-        cachedSecondFile.toUri().getPath()).canExecute());
+    // All the files should have executable permissions on them.
+    for (Path p : localCacheFiles) {
+      assertTrue("Cache file is not executable!", new File(p
+          .toUri().getPath()).canExecute());
+    }
   }
 
-  protected TaskController getTaskController() {
-    return new DefaultTaskController();
+  /**
+   * Check permissions on the public cache files
+   * 
+   * @param localCacheFiles
+   * @throws IOException
+   */
+  private void checkPublicFilePermissions(Path[] localCacheFiles)
+      throws IOException {
+    // All the files should have read and executable permissions for others
+    for (Path p : localCacheFiles) {
+      FsPermission perm = fs.getFileStatus(p).getPermission();
+      assertTrue("cache file is not readable by others", perm.getOtherAction()
+          .implies(FsAction.READ));
+      assertTrue("cache file is not executable by others", perm
+          .getOtherAction().implies(FsAction.EXECUTE));
+    }
   }
 
   protected String getJobOwnerName() throws LoginException {
@@ -361,27 +393,39 @@ public class TestTrackerDistributedCache
     if (!canRun()) {
       return;
     }
+    // This test needs mapred.local.dir to be single directory
+    // instead of four, because it assumes that both 
+    // firstcachefile and secondcachefile will be localized on same directory 
+    // so that second localization triggers deleteCache.
+    // If mapred.local.dir is four directories, second localization might not 
+    // trigger deleteCache, if it is localized in different directory.
+    Configuration conf2 = new Configuration(conf);
+    conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+    conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf);
-    FileSystem localfs = FileSystem.getLocal(conf);
+        new TrackerDistributedCacheManager(conf2, taskController);
+    FileSystem localfs = FileSystem.getLocal(conf2);
     long now = System.currentTimeMillis();
+    String userName = getJobOwnerName();
+    conf2.set("user.name", userName);
 
-    manager.getLocalCache(firstCacheFile.toUri(), conf, 
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false);
-    manager.releaseCache(firstCacheFile.toUri(), conf, now);
+    Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    manager.releaseCache(firstCacheFile.toUri(), conf2, now);
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
-    manager.getLocalCache(secondCacheFile.toUri(), conf, 
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
-    FileStatus[] dirStatuses = localfs.listStatus(
-      new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
-    assertTrue("DistributedCache failed deleting old" + 
+    manager.getLocalCache(secondCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(secondCacheFile), false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+    assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
-        dirStatuses.length == 1);
+        localfs.exists(localCache));
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
@@ -389,14 +433,17 @@ public class TestTrackerDistributedCache
       return;
     }
     TrackerDistributedCacheManager manager =
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    String userName = getJobOwnerName();
+    conf.set("user.name", userName);
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
         System.currentTimeMillis(),
-        new Path(TEST_ROOT_DIR), false);
+        new Path(TEST_ROOT_DIR), false, false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -464,18 +511,19 @@ public class TestTrackerDistributedCache
     Configuration myConf = new Configuration(conf);
     myConf.set("fs.default.name", "refresh:///");
     myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+    String userName = getJobOwnerName();
+
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(myConf);
+      new TrackerDistributedCacheManager(myConf, taskController);
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(myConf);
+    subConf.set("user.name", userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
-    String userName = getJobOwnerName();
-
     // ****** Imitate TaskRunner code.
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
@@ -516,6 +564,7 @@ public class TestTrackerDistributedCache
     
     // submit another job
     Configuration subConf2 = new Configuration(myConf);
+    subConf2.set("user.name", userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
     TrackerDistributedCacheManager.determineTimestamps(subConf2);
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
@@ -539,4 +588,46 @@ public class TestTrackerDistributedCache
     handle.release();
   }
 
+  /**
+   * Localize a file. After localization is complete, create a file, "myFile",
+   * under the directory where the file is localized and ensure that it has
+   * permissions different from what is set by default. Then, localize another
+   * file. Verify that "myFile" has the right permissions.
+   * @throws Exception
+   */
+  public void testCustomPermissions() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    String userName = getJobOwnerName();
+    conf.set("user.name", userName);
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf, taskController);
+    FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
+
+    Path[] localCache = new Path[2];
+    localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    FsPermission myPermission = new FsPermission((short)0600);
+    Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
+    if (FileSystem.create(localfs, myFile, myPermission) == null) {
+      throw new IOException("Could not create " + myFile);
+    }
+    try {
+      localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(secondCacheFile), false, 
+          System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+      FileStatus stat = localfs.getFileStatus(myFile);
+      assertTrue(stat.getPermission().equals(myPermission));
+      // validate permissions of localized files.
+      checkFilePermissions(localCache);
+    } finally {
+      localfs.delete(myFile, false);
+    }
+  }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar  4 03:43:57 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
 import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
@@ -36,7 +37,6 @@ public class TestTrackerDistributedCache
     TestTrackerDistributedCacheManager {
 
   private File configFile;
-  private MyLinuxTaskController taskController;
   private String taskTrackerSpecialGroup;
 
   private static final Log LOG =
@@ -65,7 +65,7 @@ public class TestTrackerDistributedCache
         ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
             .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
     String execPath = path + "/task-controller";
-    taskController.setTaskControllerExe(execPath);
+    ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
     taskController.setup();
 
@@ -74,6 +74,17 @@ public class TestTrackerDistributedCache
   }
 
   @Override
+  protected void refreshConf(Configuration conf) throws IOException {
+    super.refreshConf(conf);
+    String path =
+      System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+      ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
+          .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+   
+  }
+
+  @Override
   protected void tearDown()
       throws IOException {
     if (!ClusterWithLinuxTaskController.shouldRun()) {
@@ -99,27 +110,19 @@ public class TestTrackerDistributedCache
   }
 
   @Override
-  protected TaskController getTaskController() {
-    return taskController;
-  }
-
-  @Override
   protected void checkFilePermissions(Path[] localCacheFiles)
       throws IOException {
-    String cachedFirstFile = localCacheFiles[0].toUri().getPath();
-    String cachedSecondFile = localCacheFiles[1].toUri().getPath();
     String userName = getJobOwnerName();
 
-    // First make sure that the cache files have proper permissions.
-    TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile,
-        "-r-xrwx---", userName, taskTrackerSpecialGroup);
-    TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile,
-        "-r-xrwx---", userName, taskTrackerSpecialGroup);
-
-    // Now. make sure that all the path components also have proper
-    // permissions.
-    checkPermissionOnPathComponents(cachedFirstFile, userName);
-    checkPermissionOnPathComponents(cachedSecondFile, userName);
+    for (Path p : localCacheFiles) {
+      // First make sure that the cache file has proper permissions.
+      TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
+          "-r-xrwx---", userName, taskTrackerSpecialGroup);
+      // Now. make sure that all the path components also have proper
+      // permissions.
+      checkPermissionOnPathComponents(p.toUri().getPath(), userName);
+    }
+
   }
 
   /**
@@ -136,7 +139,7 @@ public class TestTrackerDistributedCache
             + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
             + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
             "");
-    LOG.info("Leading path for cacheFirstFile is : "
+    LOG.info("Trailing path for cacheFirstFile is : "
         + trailingStringForFirstFile);
     // The leading mapred.local.dir/0_[0-n]/taskTracker/$user string.
     String leadingStringForFirstFile =



Mime
View raw message