hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r919268 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/map...
Date Fri, 05 Mar 2010 02:10:25 GMT
Author: yhemanth
Date: Fri Mar  5 02:10:24 2010
New Revision: 919268

URL: http://svn.apache.org/viewvc?rev=919268&view=rev
Log:
MAPREDUCE-1435. Fix symlink handling in task work directory when cleaning up, essentially
to avoid following links. Contributed by Ravi Gummadi.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Mar  5 02:10:24 2010
@@ -412,6 +412,10 @@
     MAPREDUCE-1512. RAID uses HarFileSystem directly instead of
     FileSystem.get (Rodrigo Schmidt via dhruba)
 
+    MAPREDUCE-1435. Fix symlink handling in task work directory when
+    cleaning up, essentially to avoid following links.
+    (Ravi Gummadi via yhemanth)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Fri Mar  5 02:10:24 2010
@@ -344,11 +344,13 @@
       break;
     case FTS_SL:
       // A symbolic link
-      process_path = 1;
+      // We don't want to change-ownership(and set-permissions) for the file/dir
+      // pointed to by any symlink.
+      process_path = 0;
       break;
     case FTS_SLNONE:
       // A symbolic link with a nonexistent target
-      process_path = 1;
+      process_path = 0;
       break;
     case FTS_NS:
       // A  file for which no stat(2) information was available

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
Fri Mar  5 02:10:24 2010
@@ -19,13 +19,20 @@
 package org.apache.hadoop.streaming;
 
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Test Streaming with LinuxTaskController running the jobs as a user different
@@ -76,4 +83,95 @@
       }
     });
   }
+  
+  /**
+   * Verify if the permissions of distcache dir contents are valid once the job
+   * is finished
+   */
+  public void testStreamingWithDistCache()
+  throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    final String[] localDirs = mrCluster.getTaskTrackerLocalDirs(0);
+    final JobConf myConf = getClusterConf();
+
+    // create file that will go into public distributed cache
+    File publicFile = new File(System.getProperty(
+        "test.build.data", "/tmp"), "publicFile");
+    FileOutputStream fstream = new FileOutputStream(publicFile);
+    fstream.write("public file contents".getBytes());
+    fstream.close();
+
+    // put the file(that should go into public dist cache) in dfs and set
+    // read and exe permissions for others
+    FileSystem dfs = dfsCluster.getFileSystem();
+    dfs.setPermission(new Path(dfs.getDefaultUri(myConf).toString() + "/tmp"),
+        new FsPermission((short)0755));
+    final String publicCacheFile = dfs.getDefaultUri(myConf).toString()
+                             + "/tmp/publicFile";
+    dfs.copyFromLocalFile(new Path(publicFile.getAbsolutePath()),
+        new Path(publicCacheFile));
+    dfs.setPermission(new Path(publicCacheFile), new FsPermission((short)0755));
+    final String taskTrackerUser 
+      = UserGroupInformation.getCurrentUser().getShortUserName();
+    
+    taskControllerUser.doAs(new PrivilegedExceptionAction<Void>() {
+      public Void run() throws Exception{
+
+        FileSystem inFs = inputPath.getFileSystem(myConf);
+        FileSystem outFs = outputPath.getFileSystem(myConf);
+        outFs.delete(outputPath, true);
+        if (!inFs.mkdirs(inputPath)) {
+          throw new IOException("Mkdirs failed to create " + inFs.toString());
+        }
+
+        // create input file
+        DataOutputStream file = inFs.create(new Path(inputPath, "part-0"));
+        file.writeBytes(input);
+        file.close();
+
+        // Create file that will be passed using -files option.
+        // This is private dist cache file
+        File privateFile = new File(System.getProperty(
+            "test.build.data", "/tmp"), "test.sh");
+        privateFile.createNewFile();
+
+        String[] args =
+          new String[] {
+            "-files", privateFile.toString() + "," + publicCacheFile,
+            "-Dmapreduce.task.files.preserve.failedtasks=true",
+            "-Dstream.tmpdir=" + System.getProperty("test.build.data", "/tmp"),
+            "-input", inputPath.makeQualified(inFs).toString(),
+            "-output", outputPath.makeQualified(outFs).toString(),
+            "-mapper", "pwd",
+            "-reducer", StreamJob.REDUCE_NONE
+          };
+        StreamJob streamJob = new StreamJob();
+        streamJob.setConf(myConf);
+
+        assertTrue("Job failed", ToolRunner.run(streamJob, args)==0);
+
+        // validate private cache files' permissions
+        checkPermissionsOnPrivateDistCache(localDirs,
+            taskControllerUser.getShortUserName(), taskTrackerSpecialGroup);
+        
+        // check the file is present even after the job is over.
+        // work directory symlink cleanup should not have removed the target 
+        // files.
+        checkPresenceOfPrivateDistCacheFiles(localDirs, 
+            taskControllerUser.getShortUserName(), new String[] {"test.sh"});
+
+        // validate private cache files' permissions
+        checkPermissionsOnPublicDistCache(FileSystem.getLocal(myConf),
+            localDirs, taskTrackerUser, taskTrackerPrimaryGroup);
+
+        checkPresenceOfPublicDistCacheFiles(localDirs, 
+            new String[] {"publicFile"});
+        assertOwnerShip(outputPath);
+        return null;
+      }
+    });
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Fri
Mar  5 02:10:24 2010
@@ -199,7 +199,7 @@
   void enableTaskForCleanup(PathDeletionContext context)
          throws IOException {
     try {
-      FileUtil.chmod(context.fullPath, "ug+rwx", true);
+      FileUtil.chmod(context.fullPath, "u+rwx", true);
     } catch(InterruptedException e) {
       LOG.warn("Interrupted while setting permissions for " + context.fullPath +
           " for deletion.");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  5 02:10:24
2010
@@ -689,39 +689,6 @@
   }
   
   /**
-   * Sets permissions recursively and then deletes the contents of dir.
-   * Makes dir empty directory(does not delete dir itself).
-   */
-  static void deleteDirContents(JobConf conf, File dir) throws IOException {
-    FileSystem fs = FileSystem.getLocal(conf);
-    if (fs.exists(new Path(dir.getAbsolutePath()))) {
-      File contents[] = dir.listFiles();
-      if (contents != null) {
-        for (int i = 0; i < contents.length; i++) {
-          try {
-            int ret = 0;
-            if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
-                                      "ug+rwx", true)) != 0) {
-              LOG.warn("Unable to chmod for " + contents[i] + 
-                  "; chmod exit status = " + ret);
-            }
-          } catch(InterruptedException e) {
-            LOG.warn("Interrupted while setting permissions for contents of " +
-                "workDir. Not deleting the remaining contents of workDir.");
-            return;
-          }
-          if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
-            LOG.warn("Unable to delete "+ contents[i]);
-          }
-        }
-      }
-    }
-    else {
-      LOG.warn(dir + " does not exist.");
-    }
-  }
-  
-  /**
    * Creates distributed cache symlinks and tmp directory, as appropriate.
    * Note that when we setup the distributed
    * cache, we didn't create the symlinks. This is done on a per task basis
@@ -735,10 +702,10 @@
       LOG.debug("Fully deleting contents of " + workDir);
     }
 
-    /** delete only the contents of workDir leaving the directory empty. We
+    /** deletes only the contents of workDir leaving the directory empty. We
      * can't delete the workDir as it is the current working directory.
      */
-    deleteDirContents(conf, workDir);
+    FileUtil.fullyDeleteContents(workDir);
     
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
Fri Mar  5 02:10:24 2010
@@ -22,6 +22,8 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,8 +31,10 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -135,6 +139,11 @@
   protected UserGroupInformation taskControllerUser;
   
   protected static String taskTrackerSpecialGroup = null;
+  /**
+   * Primary group of the tasktracker - i.e. the user running the
+   * test.
+   */
+  protected static String taskTrackerPrimaryGroup = null;
   static {
     if (isTaskExecPathPassed()) {
       try {
@@ -146,6 +155,13 @@
         LOG.warn("Could not get group of the binary", e);
         fail("Could not get group of the binary");
       }
+      try {
+        taskTrackerPrimaryGroup = 
+          UserGroupInformation.getCurrentUser().getGroupNames()[0];
+      } catch (IOException ioe) {
+        LOG.warn("Could not get primary group of the current user", ioe);
+        fail("Could not get primary group of the current user");
+      }
     }
   }
 
@@ -325,4 +341,160 @@
           .equals(taskControllerUser.getGroupNames()[0]));
     }
   }
+  
+  /**
+   * Validates permissions of private distcache dir and its contents fully
+   */
+  public static void checkPermissionsOnPrivateDistCache(String[] localDirs,
+      String user, String groupOwner) throws IOException {
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPrivateDistributedCacheDir(user));
+      if (distCacheDir.exists()) {
+        checkPermissionsOnDir(distCacheDir, user, groupOwner, "dr-xrws---",
+            "-r-xrwx---");
+      }
+    }
+  }
+ 
+  /**
+   * Check that files expected to be localized in distributed cache for a user
+   * are present.
+   * @param localDirs List of mapred local directories.
+   * @param user User against which localization is happening
+   * @param expectedFileNames List of files expected to be localized
+   * @throws IOException
+   */
+  public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs,
+      String user, String[] expectedFileNames) throws IOException {
+    FileGatherer gatherer = new FileGatherer();
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPrivateDistributedCacheDir(user));
+      findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+    }
+    assertEquals("Files expected in private distributed cache were not found",
+        expectedFileNames.length, gatherer.getCount());
+  }
+
+  /**
+   * Validates permissions and ownership of public distcache dir and its 
+   * contents fully in all local dirs
+   */
+  public static void checkPermissionsOnPublicDistCache(FileSystem localFS,
+      String[] localDirs, String owner, String group) throws IOException {
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPublicDistributedCacheDir());
+
+      if (distCacheDir.exists()) {
+        checkPublicFilePermissions(localFS, distCacheDir, owner, group);
+      }
+    }
+  }
+
+  /**
+   * Checks that files expected to be localized in the public distributed
+   * cache are present
+   * @param localDirs List of mapred local directories
+   * @param expectedFileNames List of expected file names.
+   * @throws IOException
+   */
+  public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs,
+      String[] expectedFileNames) throws IOException {
+    FileGatherer gatherer = new FileGatherer();
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPublicDistributedCacheDir());
+      findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+    }
+    assertEquals("Files expected in public distributed cache were not found",
+        expectedFileNames.length, gatherer.getCount());
+  }
+  
+  /**
+   * Validates permissions and ownership on the public distributed cache files
+   */
+  private static void checkPublicFilePermissions(FileSystem localFS, File dir,
+      String owner, String group)
+      throws IOException {
+    Path dirPath = new Path(dir.getAbsolutePath());
+    TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS, 
+        new Path[] {dirPath});
+    TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS,
+        new Path[] {dirPath}, owner, group);
+    if (dir.isDirectory()) {
+      File[] files = dir.listFiles();
+      for (File file : files) {
+        checkPublicFilePermissions(localFS, file, owner, group);
+      }
+    }
+  }
+
+  /**
+   * Validates permissions of given dir and its contents fully(i.e. recursively)
+   */
+  private static void checkPermissionsOnDir(File dir, String user,
+      String groupOwner, String expectedDirPermissions,
+      String expectedFilePermissions) throws IOException {
+    TestTaskTrackerLocalization.checkFilePermissions(dir.toString(),
+        expectedDirPermissions, user, groupOwner);
+    File[] files = dir.listFiles();
+    for (File file : files) {
+      if (file.isDirectory()) {
+        checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions,
+            expectedFilePermissions);
+      } else {
+        TestTaskTrackerLocalization.checkFilePermissions(file.toString(),
+            expectedFilePermissions, user, groupOwner);
+      }
+    }
+  }
+
+  // Check which files among those expected are present in the rootDir
+  // Add those present to the FileGatherer.
+  private static void findExpectedFiles(String[] expectedFileNames,
+      File rootDir, FileGatherer gatherer) {
+    
+    File[] files = rootDir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File file : files) {
+      if (file.isDirectory()) {
+        findExpectedFiles(expectedFileNames, file, gatherer);
+      } else {
+        if (isFilePresent(expectedFileNames, file)) {
+          gatherer.addFileName(file.getName());
+        }
+      }
+    }
+    
+  }
+  
+  // Test if the passed file is present in the expected list of files.
+  private static boolean isFilePresent(String[] expectedFileNames, File file) {
+    boolean foundFileName = false;
+    for (String name : expectedFileNames) {
+      if (name.equals(file.getName())) {
+        foundFileName = true;
+        break;
+      }
+    }
+    return foundFileName;
+  }
+  
+  // Helper class to collect a list of file names across multiple
+  // method calls. Wrapper around a collection defined for clarity
+  private static class FileGatherer {
+    List<String> foundFileNames = new ArrayList<String>();
+    
+    void addFileName(String fileName) {
+      foundFileNames.add(fileName);
+    }
+    
+    int getCount() {
+      return foundFileNames.size();
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Fri
Mar  5 02:10:24 2010
@@ -276,6 +276,15 @@
     return (taskTrackerList.get(taskTracker)).getLocalDir();
   }
 
+  /**
+   * Get all the local directories for the Nth task tracker
+   * @param taskTracker the index of the task tracker to check
+   * @return array of local dirs
+   */
+  public String[] getTaskTrackerLocalDirs(int taskTracker) {
+    return (taskTrackerList.get(taskTracker)).getLocalDirs();
+  }
+
   public JobTrackerRunner getJobTrackerRunner() {
     return jobTracker;
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
Fri Mar  5 02:10:24 2010
@@ -23,65 +23,94 @@
 
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
 
+/**
+ * Verifies if TaskRunner.SetupWorkDir() is cleaning up files/dirs pointed
+ * to by symlinks under work dir.
+ */
 public class TestSetupWorkDir extends TestCase {
-  private static final Log LOG =
-    LogFactory.getLog(TestSetupWorkDir.class);
 
   /**
-   * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
-   * can delete it directly(without doing chmod).
-   * Creates dir/subDir and dir/subDir/file
+   * Creates 1 subdirectory and 1 file under dir2. Creates 1 subdir, 1 file,
+   * 1 symlink to a dir and a symlink to a file under dir1.
+   * Creates dir1/subDir, dir1/file, dir2/subDir, dir2/file,
+   * dir1/symlinkSubDir->dir2/subDir, dir1/symlinkFile->dir2/file.
    */
-  static void createFileAndSetPermissions(JobConf jobConf, Path dir)
+  static void createSubDirsAndSymLinks(JobConf jobConf, Path dir1, Path dir2)
        throws IOException {
-    Path subDir = new Path(dir, "subDir");
     FileSystem fs = FileSystem.getLocal(jobConf);
+    createSubDirAndFile(fs, dir1);
+    createSubDirAndFile(fs, dir2);
+    // now create symlinks under dir1 that point to file/dir under dir2
+    FileUtil.symLink(dir2+"/subDir", dir1+"/symlinkSubDir");
+    FileUtil.symLink(dir2+"/file", dir1+"/symlinkFile");
+  }
+
+  static void createSubDirAndFile(FileSystem fs, Path dir) throws IOException {
+    Path subDir = new Path(dir, "subDir");
     fs.mkdirs(subDir);
-    Path p = new Path(subDir, "file");
+    Path p = new Path(dir, "file");
     DataOutputStream out = fs.create(p);
     out.writeBytes("dummy input");
-    out.close();
-    // no write permission for subDir and subDir/file
-    try {
-      int ret = 0;
-      if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
-        LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
-      }
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while doing chmod for " + subDir);
+    out.close();    
+  }
+
+  void createEmptyDir(FileSystem fs, Path dir) throws IOException {
+    if (fs.exists(dir)) {
+      fs.delete(dir, true);
+    }
+    if (!fs.mkdirs(dir)) {
+      throw new IOException("Unable to create directory " + dir);
     }
   }
 
   /**
-   * Validates if setupWorkDir is properly cleaning up contents of workDir.
+   * Validates if TaskRunner.setupWorkDir() is properly cleaning up the
+   * contents of workDir and creating tmp dir under it (even though workDir
+   * contains symlinks to files/directories).
    */
   public void testSetupWorkDir() throws IOException {
     Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
                             "testSetupWorkDir");
     Path myWorkDir = new Path(rootDir, "./work");
+    Path myTargetDir = new Path(rootDir, "./tmp");
     JobConf jConf = new JobConf();
     FileSystem fs = FileSystem.getLocal(jConf);
-    if (fs.exists(myWorkDir)) {
-      fs.delete(myWorkDir, true);
-    }
-    if (!fs.mkdirs(myWorkDir)) {
-      throw new IOException("Unable to create workDir " + myWorkDir);
-    }
+    createEmptyDir(fs, myWorkDir);
+    createEmptyDir(fs, myTargetDir);
+
+    // create subDirs and symlinks under work dir
+    createSubDirsAndSymLinks(jConf, myWorkDir, myTargetDir);
 
-    // create {myWorkDir}/subDir/file and set 555 perms for subDir and file
-    createFileAndSetPermissions(jConf, myWorkDir);
+    assertTrue("Did not create symlinks/files/dirs properly. Check "
+        + myWorkDir + " and " + myTargetDir,
+        (fs.listStatus(myWorkDir).length == 4) &&
+        (fs.listStatus(myTargetDir).length == 2));
+
+    // let us disable creation of symlinks in setupWorkDir()
+    jConf.set(JobContext.CACHE_SYMLINK, "no");
+
+    // Deletion of myWorkDir should not affect contents of myTargetDir.
+    // myTargetDir is like $user/jobcache/distcache
+    TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));
+
+    // Contents of myWorkDir should be cleaned up and a tmp dir should be
+    // created under myWorkDir
+    assertTrue(myWorkDir + " is not cleaned up properly.",
+        fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 1));
+
+    // Make sure that the dir under myWorkDir is tmp
+    assertTrue(fs.listStatus(myWorkDir)[0].getPath().toUri().getPath()
+               .toString().equals(myWorkDir.toString() + "/tmp"));
+
+    // Make sure that myTargetDir is not changed/deleted
+    assertTrue("Dir " + myTargetDir + " seem to be modified.",
+        fs.exists(myTargetDir) && (fs.listStatus(myTargetDir).length == 2));
 
-    TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath()));
-    
-    assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.",
-        fs.listStatus(myWorkDir).length == 0);
-    
     // cleanup
     fs.delete(rootDir, true);
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Fri Mar  5 02:10:24 2010
@@ -643,11 +643,36 @@
   }
 
   /**
+   * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
+   * can delete it directly(without doing chmod).
+   * Creates dir/subDir and dir/subDir/file
+   */
+  static void createFileAndSetPermissions(JobConf jobConf, Path dir)
+       throws IOException {
+    Path subDir = new Path(dir, "subDir");
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    fs.mkdirs(subDir);
+    Path p = new Path(subDir, "file");
+    java.io.DataOutputStream out = fs.create(p);
+    out.writeBytes("dummy input");
+    out.close();
+    // no write permission for subDir and subDir/file
+    try {
+      int ret = 0;
+      if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
+        LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
+      }
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while doing chmod for " + subDir);
+    }
+  }
+
+  /**
    * Validates the removal of $taskid and $tasid/work under mapred-local-dir
    * in cases where those directories cannot be deleted without adding
    * write permission to the newly created directories under $taskid and
    * $taskid/work
-   * Also see TestSetupWorkDir.createFileAndSetPermissions for details
+   * Also see createFileAndSetPermissions for details
    */
   void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
                            TaskInProgress tip) throws IOException {
@@ -662,7 +687,7 @@
     Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
     for (Path p : paths) {
       if (tracker.getLocalFileSystem().exists(p)) {
-        TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p);
+        createFileAndSetPermissions(localizedJobConf, p);
       }
     }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=919268&r1=919267&r2=919268&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Fri Mar  5 02:10:24 2010
@@ -374,16 +374,53 @@
    */
   private void checkPublicFilePermissions(Path[] localCacheFiles)
       throws IOException {
+    checkPublicFilePermissions(fs, localCacheFiles);
+  }
+
+  /**
+   * Verify the permissions for a file localized as a public distributed
+   * cache file
+   * @param fs The Local FileSystem used to get the permissions
+   * @param localCacheFiles The list of files whose permissions should be 
+   * verified.
+   * @throws IOException
+   */
+  public static void checkPublicFilePermissions(FileSystem fs, 
+      Path[] localCacheFiles) throws IOException {
     // All the files should have read and executable permissions for others
     for (Path p : localCacheFiles) {
       FsPermission perm = fs.getFileStatus(p).getPermission();
-      assertTrue("cache file is not readable by others", perm.getOtherAction()
-          .implies(FsAction.READ));
-      assertTrue("cache file is not executable by others", perm
-          .getOtherAction().implies(FsAction.EXECUTE));
+      assertTrue("cache file is not readable / executable by owner: perm="
+          + perm.getUserAction(), perm.getUserAction()
+          .implies(FsAction.READ_EXECUTE));
+      assertTrue("cache file is not readable / executable by group: perm="
+          + perm.getGroupAction(), perm.getGroupAction()
+          .implies(FsAction.READ_EXECUTE));
+      assertTrue("cache file is not readable / executable by others: perm="
+          + perm.getOtherAction(), perm.getOtherAction()
+          .implies(FsAction.READ_EXECUTE));
     }
   }
-
+  
+  /**
+   * Verify the ownership for files localized as a public distributed cache
+   * file.
+   * @param fs The Local FileSystem used to get the ownership
+   * @param localCacheFiles THe list of files whose ownership should be
+   * verified
+   * @param owner The owner of the files
+   * @param group The group owner of the files.
+   * @throws IOException
+   */
+  public static void checkPublicFileOwnership(FileSystem fs,
+      Path[] localCacheFiles, String owner, String group)
+      throws IOException {
+    for (Path p: localCacheFiles) {
+      assertEquals(owner, fs.getFileStatus(p).getOwner());
+      assertEquals(group, fs.getFileStatus(p).getGroup());
+    }
+  }
+  
   protected String getJobOwnerName() throws IOException {
     return UserGroupInformation.getLoginUser().getUserName();
   }



Mime
View raw message