hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r893828 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/test/mapred/org/a...
Date Fri, 25 Dec 2009 00:55:24 GMT
Author: ddas
Date: Fri Dec 25 00:55:23 2009
New Revision: 893828

URL: http://svn.apache.org/viewvc?rev=893828&view=rev
Log:
MAPREDUCE-744. Introduces the notion of a public distributed cache. Contributed by Devaraj
Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 25 00:55:23 2009
@@ -19,6 +19,9 @@
     MAPREDUCE-1168. Export data to databases via Sqoop. (Aaron Kimball via
     tomwhite)
 
+    MAPREDUCE-744. Introduces the notion of a public distributed cache.
+    (Devaraj Das)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Dec 25
00:55:23 2009
@@ -121,7 +121,7 @@
       taskDistributedCacheManager.setup(
           new LocalDirAllocator(MRConfig.LOCAL_DIR), 
           new File(systemJobDir.toString()),
-          "archive");
+          "archive", "archive");
       
       if (DistributedCache.getSymlink(conf)) {
         // This is not supported largely because, 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Dec 25 00:55:23
2009
@@ -174,7 +174,8 @@
       taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
           .newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
-          .getDistributedCacheDir(conf.getUser()));
+          .getPrivateDistributedCacheDir(conf.getUser()), 
+          TaskTracker.getPublicDistributedCacheDir());
 
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 25 00:55:23
2009
@@ -447,9 +447,13 @@
     return TaskTracker.SUBDIR + Path.SEPARATOR + user;
   }
 
-  public static String getDistributedCacheDir(String user) {
+  public static String getPrivateDistributedCacheDir(String user) {
     return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
   }
+  
+  public static String getPublicDistributedCacheDir() {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  }
 
   public static String getJobCacheSubdir(String user) {
     return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Dec 25
00:55:23 2009
@@ -107,6 +107,10 @@
     "mapreduce.job.cache.files.timestamps";
   public static final String CACHE_ARCHIVES_TIMESTAMPS = 
     "mapreduce.job.cache.archives.timestamps";
+  public static final String CACHE_FILE_VISIBILITIES = 
+    "mapreduce.job.cache.files.visibilities";
+  public static final String CACHE_ARCHIVES_VISIBILITIES = 
+    "mapreduce.job.cache.archives.visibilities";
   public static final String CACHE_SYMLINK = 
     "mapreduce.job.cache.symlink.create";
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Dec
25 00:55:23 2009
@@ -224,6 +224,8 @@
 
     //  set the timestamps of the archives and files
     TrackerDistributedCacheManager.determineTimestamps(conf);
+    //  set the public/private visibility of the archives and files
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf);
   }
   
   private URI getPathURI(Path destPath, String fragment) 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
Fri Dec 25 00:55:23 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Helper class of {@link TrackerDistributedCacheManager} that represents
@@ -43,9 +44,8 @@
  * by TaskRunner/LocalJobRunner to parse out the job configuration
  * and setup the local caches.
  * 
- * <b>This class is internal to Hadoop, and should not be treated as a public
- * interface.</b>
  */
+@InterfaceAudience.Private
 public class TaskDistributedCacheManager {
   private final TrackerDistributedCacheManager distributedCacheManager;
   private final Configuration taskConf;
@@ -66,6 +66,7 @@
       REGULAR,
       ARCHIVE
     }
+    boolean isPublic = true;
     /** Whether to decompress */
     final FileType type;
     final long timestamp;
@@ -73,10 +74,11 @@
     final boolean shouldBeAddedToClassPath;
     boolean localized = false;
 
-    private CacheFile(URI uri, FileType type, long timestamp, 
+    private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
         boolean classPath) {
       this.uri = uri;
       this.type = type;
+      this.isPublic = isPublic;
       this.timestamp = timestamp;
       this.shouldBeAddedToClassPath = classPath;
     }
@@ -87,7 +89,7 @@
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, Path[] paths, FileType type) {
+        String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
         if (uris.length != timestamps.length) {
@@ -103,7 +105,8 @@
           URI u = uris[i];
           boolean isClassPath = (null != classPaths.get(u.getPath()));
           long t = Long.parseLong(timestamps[i]);
-          ret.add(new CacheFile(u, type, t, isClassPath));
+          ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
+              t, isClassPath));
         }
       }
       return ret;
@@ -127,11 +130,13 @@
     this.cacheFiles.addAll(
         CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
             DistributedCache.getFileTimestamps(taskConf),
+            TrackerDistributedCacheManager.getFileVisibilities(taskConf),
             DistributedCache.getFileClassPaths(taskConf),
             CacheFile.FileType.REGULAR));
     this.cacheFiles.addAll(
         CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
           DistributedCache.getArchiveTimestamps(taskConf),
+          TrackerDistributedCacheManager.getArchiveVisibilities(taskConf),
           DistributedCache.getArchiveClassPaths(taskConf), 
           CacheFile.FileType.ARCHIVE));
   }
@@ -144,7 +149,7 @@
    * file, if necessary.
    */
   public void setup(LocalDirAllocator lDirAlloc, File workDir, 
-      String cacheSubdir) throws IOException {
+      String privateCacheSubdir, String publicCacheSubDir) throws IOException {
     setupCalled = true;
       
     if (cacheFiles.isEmpty()) {
@@ -159,7 +164,10 @@
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-
+      String cacheSubdir = publicCacheSubDir;
+      if (!cacheFile.isPublic) {
+        cacheSubdir = privateCacheSubdir;
+      }
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
           cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Fri Dec 25 00:55:23 2009
@@ -38,7 +38,10 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Manages a single machine's instance of a cross-job
@@ -46,9 +49,8 @@
  * by a TaskTracker (or something that emulates it,
  * like LocalJobRunner).
  * 
- * <b>This class is internal to Hadoop, and should not be treated as a public
- * interface.</b>
  */
+@InterfaceAudience.Private
 public class TrackerDistributedCacheManager {
   // cacheID to cacheStatus mapping
   private TreeMap<String, CacheStatus> cachedArchives = 
@@ -305,6 +307,51 @@
 
     return fileSystem.getFileStatus(filePath).getModificationTime();
   }
+  
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * or not
+   * @param conf
+   * @param uri
+   * @return true if the path in the uri is visible to all, false otherwise
+   * @throws IOException
+   */
+  static boolean isPublic(Configuration conf, URI uri) throws IOException {
+    FileSystem fs = FileSystem.get(uri, conf);
+    Path current = new Path(uri.getPath());
+    //the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+      return false;
+    }
+    current = current.getParent();
+    while (current != null) {
+      //the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+        return false;
+      }
+      current = current.getParent();
+    }
+    return true;
+  }
+  /**
+   * Checks for a given path whether the Other permissions on it 
+   * imply the permission in the passed FsAction
+   * @param fs
+   * @param path
+   * @param action
+   * @return true if the path in the uri is visible to all, false otherwise
+   * @throws IOException
+   */
+  private static boolean checkPermissionOfOther(FileSystem fs, Path path,
+      FsAction action) throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    FsPermission perms = status.getPermission();
+    FsAction otherAction = perms.getOtherAction();
+    if (otherAction.implies(action)) {
+      return true;
+    }
+    return false;
+  }
 
   private Path checkCacheStatusValidity(Configuration conf,
       URI cache, long confFileStamp,
@@ -585,8 +632,60 @@
       setFileTimestamps(job, fileTimestamps.toString());
     }
   }
+  /**
+   * Determines the visibilities of the distributed cache files and 
+   * archives. The visibility of a cache path is "public" if the leaf component
+   * has READ permissions for others, and the parent subdirs have 
+   * EXECUTE permissions for others
+   * @param job
+   * @throws IOException
+   */
+  public static void determineCacheVisibilities(Configuration job) 
+  throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    if (tarchives != null) {
+      StringBuffer archiveVisibilities = 
+        new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+      for (int i = 1; i < tarchives.length; i++) {
+        archiveVisibilities.append(",");
+        archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i])));
+      }
+      setArchiveVisibilities(job, archiveVisibilities.toString());
+    }
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if (tfiles != null) {
+      StringBuffer fileVisibilities = 
+        new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+      for (int i = 1; i < tfiles.length; i++) {
+        fileVisibilities.append(",");
+        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+      }
+      setFileVisibilities(job, fileVisibilities.toString());
+    }
+  }
   
   /**
+   * Get the booleans on whether the files are public or not.  Used by 
+   * internal DistributedCache and MapReduce code.
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of booleans 
+   * @throws IOException
+   */
+  static String[] getFileVisibilities(Configuration conf) {
+    return conf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+  }
+
+  /**
+   * Get the booleans on whether the archives are public or not.  Used by 
+   * internal DistributedCache and MapReduce code.
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of booleans 
+   */
+  static String[] getArchiveVisibilities(Configuration conf) {
+    return conf.getStrings(JobContext.CACHE_ARCHIVES_VISIBILITIES);
+  }
+
+  /**
    * This method checks if there is a conflict in the fragment names 
    * of the uris. Also makes sure that each uri has a fragment. It 
    * is only to be called if you want to create symlinks for 
@@ -631,6 +730,28 @@
     }
     return true;
   }
+  /**
+   * This is to check the public/private visibility of the archives to be
+   * localized.
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param booleans comma separated list of booleans (true - public)
+   * The order should be the same as the order in which the archives are added.
+   */
+  static void setArchiveVisibilities(Configuration conf, String booleans) {
+    conf.set(JobContext.CACHE_ARCHIVES_VISIBILITIES, booleans);
+  }
+
+  /**
+   * This is to check the public/private visibility of the files to be localized
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param booleans comma separated list of booleans (true - public)
+   * The order should be the same as the order in which the files are added.
+   */
+  static void setFileVisibilities(Configuration conf, String booleans) {
+    conf.set(JobContext.CACHE_FILE_VISIBILITIES, booleans);
+  }
 
   /**
    * This is to check the timestamp of the archives to be localized.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
Fri Dec 25 00:55:23 2009
@@ -233,7 +233,7 @@
 
           // Set up the cache directory used for distributed cache files
           File distributedCacheDir =
-              new File(localDir, TaskTracker.getDistributedCacheDir(user));
+              new File(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
           if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
             // Set permissions on the distcache-directory
             PermissionsHandler.setPermissions(distributedCacheDir,

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
Fri Dec 25 00:55:23 2009
@@ -140,7 +140,7 @@
       // Verify the distributed cache dir.
       File distributedCacheDir =
           new File(localDir, TaskTracker
-              .getDistributedCacheDir(task.getUser()));
+              .getPrivateDistributedCacheDir(task.getUser()));
       assertTrue("distributed cache dir " + distributedCacheDir
           + " doesn't exists!", distributedCacheDir.exists());
       checkFilePermissions(distributedCacheDir.getAbsolutePath(),

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Fri Dec 25 00:55:23 2009
@@ -390,7 +390,7 @@
       // Verify the distributed cache dir.
       File distributedCacheDir =
           new File(localDir, TaskTracker
-              .getDistributedCacheDir(task.getUser()));
+              .getPrivateDistributedCacheDir(task.getUser()));
       assertTrue("distributed cache dir " + distributedCacheDir
           + " doesn't exists!", distributedCacheDir.exists());
       checkFilePermissions(distributedCacheDir.getAbsolutePath(),

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Fri Dec 25 00:55:23 2009
@@ -45,7 +45,7 @@
 
   @Override
   protected void setUp()
-      throws IOException {
+      throws IOException, InterruptedException {
 
     if (!ClusterWithLinuxTaskController.shouldRun()) {
       return;
@@ -134,7 +134,7 @@
     String trailingStringForFirstFile =
         cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
             + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
-            + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName),
+            + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
             "");
     LOG.info("Leading path for cacheFirstFile is : "
         + trailingStringForFirstFile);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=893828&r1=893827&r2=893828&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Fri Dec 25 00:55:23 2009
@@ -73,7 +73,7 @@
     new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
 
   @Override
-  protected void setUp() throws IOException {
+  protected void setUp() throws IOException,InterruptedException {
 
     // Prepare the tests' root dir
     File TEST_ROOT = new File(TEST_ROOT_DIR);
@@ -94,8 +94,8 @@
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
-    createTempFile(firstCacheFile);
-    createTempFile(secondCacheFile);
+    createPrivateTempFile(firstCacheFile);
+    createPrivateTempFile(secondCacheFile);
   }
 
   /**
@@ -124,6 +124,7 @@
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
     Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
@@ -141,7 +142,8 @@
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
     handle.setup(localDirAllocator, workDir, TaskTracker
-        .getDistributedCacheDir(userName));
+        .getPrivateDistributedCacheDir(userName), 
+        TaskTracker.getPublicDistributedCacheDir());
 
     InitializationContext context = new InitializationContext();
     context.user = userName;
@@ -192,7 +194,7 @@
   }
 
   public void testReferenceCount() throws IOException, LoginException,
-      URISyntaxException {
+      URISyntaxException, InterruptedException {
     if (!canRun()) {
       return;
     }
@@ -209,19 +211,21 @@
     job1.addCacheFile(secondCacheFile.toUri());
     Configuration conf1 = job1.getConfiguration();
     TrackerDistributedCacheManager.determineTimestamps(conf1);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
     // Task localizing for first job
     TaskDistributedCacheManager handle = manager
         .newTaskDistributedCacheManager(conf1);
     handle.setup(localDirAllocator, workDir, TaskTracker
-          .getDistributedCacheDir(userName));
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
-    createTempFile(thirdCacheFile);
+    createPrivateTempFile(thirdCacheFile);
     
     // Configures another job with three regular files.
     Job job2 = Job.getInstance(cluster, conf);
@@ -233,6 +237,7 @@
     job2.addCacheFile(thirdCacheFile.toUri());
     Configuration conf2 = job2.getConfiguration();
     TrackerDistributedCacheManager.determineTimestamps(conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
 
     // Task localizing for second job
     // localization for the "firstCacheFile" will fail.
@@ -240,7 +245,8 @@
     Throwable th = null;
     try {
       handle.setup(localDirAllocator, workDir, TaskTracker
-          .getDistributedCacheDir(userName));
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
     } catch (IOException e) {
       th = e;
       Log.info("Exception during setup", e);
@@ -261,7 +267,66 @@
     assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
     fs.delete(thirdCacheFile, false);
   }
+  
+  /**
+   * Tests that localization of distributed cache file happens in the desired
+   * directory
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testPublicPrivateCache() 
+  throws IOException, LoginException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+    checkLocalizedPath("true");
+    checkLocalizedPath("false");
+  }
+  
+  private void checkLocalizedPath(String visibility) 
+  throws IOException, LoginException, InterruptedException {
+    TrackerDistributedCacheManager manager = 
+      new TrackerDistributedCacheManager(conf);
+    Cluster cluster = new Cluster(conf);
+    String userName = getJobOwnerName();
+    File workDir = new File(TEST_ROOT_DIR, "workdir");
+    Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+    if ("true".equals(visibility)) {
+      createPublicTempFile(cacheFile);
+    } else {
+      createPrivateTempFile(cacheFile);
+    }
+    
+    Job job1 = Job.getInstance(cluster, conf);
+    job1.addCacheFile(cacheFile.toUri());
+    Configuration conf1 = job1.getConfiguration();
+    TrackerDistributedCacheManager.determineTimestamps(conf1);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
+    // Task localizing for job
+    TaskDistributedCacheManager handle = manager
+        .newTaskDistributedCacheManager(conf1);
+    handle.setup(localDirAllocator, workDir, TaskTracker
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
+    TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
+    String distCacheDir;
+    if ("true".equals(visibility)) {
+      distCacheDir = TaskTracker.getPublicDistributedCacheDir(); 
+    } else {
+      distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
+    }
+    Path localizedPath =
+      manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
+          fs.getFileStatus(cacheFile), false,
+          c.timestamp, new Path(TEST_ROOT_DIR), false);
+    assertTrue("Cache file didn't get localized in the expected directory. " +
+        "Expected localization to happen within " + 
+        ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+        ", but was localized at " + 
+        localizedPath, localizedPath.toString().contains(distCacheDir));
+  }
+  
   /**
    * Check proper permissions on the cache files
    * 
@@ -342,6 +407,18 @@
     os.close();
     FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
   }
+  
+  static void createPublicTempFile(Path p) 
+  throws IOException, InterruptedException {
+    createTempFile(p);
+    FileUtil.chmod(p.toString(), "0777",true);
+  }
+  
+  static void createPrivateTempFile(Path p) 
+  throws IOException, InterruptedException {
+    createTempFile(p);
+    FileUtil.chmod(p.toString(), "0770",true);
+  }
 
   @Override
   protected void tearDown() throws IOException {
@@ -391,6 +468,7 @@
     Configuration subConf = new Configuration(myConf);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
     String userName = getJobOwnerName();
@@ -401,7 +479,8 @@
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
     handle.setup(localDirAllocator, workDir, TaskTracker
-        .getDistributedCacheDir(userName));
+        .getPrivateDistributedCacheDir(userName), 
+        TaskTracker.getPublicDistributedCacheDir());
     // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -422,7 +501,7 @@
     Throwable th = null;
     try {
       handle.setup(localDirAllocator, workDir, TaskTracker
-          .getDistributedCacheDir(userName));
+          .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
     } catch (IOException ie) {
       th = ie;
     }
@@ -436,11 +515,12 @@
     Configuration subConf2 = new Configuration(myConf);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
     TrackerDistributedCacheManager.determineTimestamps(subConf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
     
     handle =
       manager.newTaskDistributedCacheManager(subConf2);
     handle.setup(localDirAllocator, workDir, TaskTracker
-        .getDistributedCacheDir(userName));
+        .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
     Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
     assertNotNull(null, localCacheFiles2);
     assertEquals(1, localCacheFiles2.length);



Mime
View raw message