hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r938779 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Wed, 28 Apr 2010 04:25:18 GMT
Author: vinodkv
Date: Wed Apr 28 04:25:17 2010
New Revision: 938779

URL: http://svn.apache.org/viewvc?rev=938779&view=rev
Log:
MAPREDUCE-1609. TaskTracker.localizeJob should not set permissions on job log directory recursively.
Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=938779&r1=938778&r2=938779&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Apr 28 04:25:17 2010
@@ -602,6 +602,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1728. Oracle timezone strings do not match Java.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-1609. TaskTracker.localizeJob should not set permissions on 
+    job log directory recursively. (Amareshwari Sriramadasu via vinodkv)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=938779&r1=938778&r2=938779&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Wed Apr 28 04:25:17 2010
@@ -387,21 +387,9 @@ static int secure_path(const char *path,
     if (!process_path) {
       continue;
     }
+    error_code = secure_single_path(entry->fts_path, uid, gid,
+      (dir ? dir_mode : file_mode), should_check_ownership);
 
-    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
-      fprintf(LOGFILE,
-          "Invalid file path. %s not user/group owned by the tasktracker.\n",
-          entry->fts_path);
-      error_code = -1;
-    } else if (change_owner(entry->fts_path, uid, gid) != 0) {
-      fprintf(LOGFILE, "couldn't change the ownership of %s\n",
-          entry->fts_path);
-      error_code = -3;
-    } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
-      fprintf(LOGFILE, "couldn't change the permissions of %s\n",
-          entry->fts_path);
-      error_code = -3;
-    }
   }
   if (fts_close(tree) != 0) {
     fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
@@ -411,6 +399,28 @@ static int secure_path(const char *path,
 }
 
 /**
+ * Function to change ownership and permissions of the given path. 
+ * This call sets ownership and permissions just for the path, not recursive.  
+ */
+int secure_single_path(char *path, uid_t uid, gid_t gid,
+    mode_t perm, int should_check_ownership) {
+  int error_code = 0;
+  if (should_check_ownership && 
+      (check_ownership(path, uid, gid) != 0)) {
+    fprintf(LOGFILE,
+      "Invalid file path. %s not user/group owned by the tasktracker.\n", path);
+    error_code = -1;
+  } else if (change_owner(path, uid, gid) != 0) {
+    fprintf(LOGFILE, "couldn't change the ownership of %s\n", path);
+    error_code = -3;
+  } else if (change_mode(path, perm) != 0) {
+    fprintf(LOGFILE, "couldn't change the permissions of %s\n", path);
+    error_code = -3;
+  }
+  return error_code;
+}
+
+/**
  * Function to prepare the attempt directories for the task JVM.
  * This is done by changing the ownership of the attempt directory recursively
  * to the job owner. We do the following:
@@ -541,8 +551,10 @@ int prepare_job_logs(const char *log_dir
   }
 
   gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-  if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
-      permissions, S_ISGID | permissions, 1) != 0) {
+  // job log directory should not be set permissions recursively
+  // because, on tt restart/reinit, it would contain directories of earlier run
+  if (secure_single_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+      S_ISGID | permissions, 1) != 0) {
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
     free(job_log_dir);
     return -1;
@@ -615,8 +627,9 @@ int get_user_details(const char *user) {
 
 /*
  * Function to check if the TaskTracker actually owns the file.
-  */
-int check_ownership(char *path) {
+ * Or it has right ownership already. 
+ */
+int check_ownership(char *path, uid_t uid, gid_t gid) {
   struct stat filestat;
   if (stat(path, &filestat) != 0) {
     return UNABLE_TO_STAT_FILE;
@@ -624,8 +637,10 @@ int check_ownership(char *path) {
   // check user/group. User should be TaskTracker user, group can either be
   // TaskTracker's primary group or the special group to which binary's
   // permissions are set.
-  if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
-      != filestat.st_gid)) {
+  // Or it can be the user/group owned by uid and gid passed. 
+  if ((getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+      != filestat.st_gid)) &&
+      ((uid != filestat.st_uid) || (gid != filestat.st_gid))) {
     return FILE_NOT_OWNED_BY_TASKTRACKER;
   }
   return 0;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=938779&r1=938778&r2=938779&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Apr 28 04:25:17
2010
@@ -286,6 +286,14 @@ public class TaskTracker 
 
   private MRAsyncDiskService asyncDiskService;
   
+  MRAsyncDiskService getAsyncDiskService() {
+    return asyncDiskService;
+  }
+
+  void setAsyncDiskService(MRAsyncDiskService asyncDiskService) {
+    this.asyncDiskService = asyncDiskService;
+  }
+
   /**
   * Handle to the specific instance of the {@link TaskController} class
   */
@@ -603,8 +611,8 @@ public class TaskTracker 
     // Check local disk, start async disk service, and clean up all 
     // local directories.
     checkLocalDirs(this.fConf.getLocalDirs());
-    asyncDiskService = new MRAsyncDiskService(fConf);
-    asyncDiskService.cleanupAllVolumes();
+    setAsyncDiskService(new MRAsyncDiskService(fConf));
+    getAsyncDiskService().cleanupAllVolumes();
 
     // Clear out state tables
     this.tasks.clear();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=938779&r1=938778&r2=938779&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
Wed Apr 28 04:25:17 2010
@@ -59,27 +59,6 @@ public class TestLocalizationWithLinuxTa
 
     super.setUp();
 
-    taskController = new MyLinuxTaskController();
-    String path =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
-    String execPath = path + "/task-controller";
-    ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath);
-    taskController.setConf(trackerFConf);
-    taskController.setup();
-
-    tracker.setTaskController(taskController);
-    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
-        taskController));
-
-    // Rewrite conf so as to reflect task's correct user name.
-    String ugi =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
-    JobConf jobConf = new JobConf(task.getConf());
-    String user = ugi.split(",")[0];
-    jobConf.setUser(user);
-    uploadJobConf(jobConf);
-    task.setConf(jobConf);
-    task.setUser(user);
     taskTrackerUserName = UserGroupInformation.getLoginUser()
                           .getShortUserName();
   }
@@ -96,6 +75,18 @@ public class TestLocalizationWithLinuxTa
     }
   }
 
+  protected TaskController createTaskController() {
+    return new MyLinuxTaskController();
+  }
+
+  protected UserGroupInformation getJobOwner() {
+    String ugi = System
+        .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    String[] splits = ugi.split(",");
+    return UserGroupInformation.createUserForTesting(splits[0],
+        new String[] { splits[1] });
+  }
+
   /** @InheritDoc */
   @Override
   public void testTaskControllerSetup() {
@@ -219,7 +210,8 @@ public class TestLocalizationWithLinuxTa
     // check the private permissions of various directories
     List<Path> dirs = new ArrayList<Path>();
     dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
-        .getUser(), jobId.toString(), taskId.toString()), trackerFConf));
+        .getUser(), jobId.toString(), taskId.toString(),
+        task.isTaskCleanupTask()), trackerFConf));
     dirs.add(attemptWorkDir);
     dirs.add(new Path(attemptWorkDir, "tmp"));
     dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=938779&r1=938778&r2=938779&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Wed Apr 28 04:25:17 2010
@@ -22,6 +22,8 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
@@ -39,12 +41,14 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 
@@ -85,6 +89,8 @@ public class TestTaskTrackerLocalization
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
   private TaskInProgress tip;
+  private JobConf jobConf;
+  private File jobConfFile;
 
   /**
    * Dummy method in this base class. Only derived classes will define this
@@ -123,13 +129,14 @@ public class TestTaskTrackerLocalization
     trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
 
     // Create the job configuration file. Same as trackerConf in this test.
-    Configuration jobConf = new Configuration(trackerFConf);
+    jobConf = new JobConf(trackerFConf);
     // Set job view ACLs in conf sothat validation of contents of jobACLsFile
     // can be done against this value. Have both users and groups
     String jobViewACLs = "user1,user2, group1,group2";
     jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
 
     jobConf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 0);
+    jobConf.setUser(getJobOwner().getShortUserName());
 
     Job job = new Job(jobConf);
     String jtIdentifier = "200907202331";
@@ -140,44 +147,68 @@ public class TestTaskTrackerLocalization
     uploadJobJar(job);
 
     // JobClient uploads the jobConf to the file system.
-    File jobConfFile = uploadJobConf(job.getConfiguration());
+    jobConfFile = uploadJobConf(job.getConfiguration());
     
     // create jobTokens file
     uploadJobTokensFile(); 
     
-        // Set up the TaskTracker
+    taskTrackerUGI = UserGroupInformation.getCurrentUser();
+    startTracker();
+
+    // Set up the task to be localized
+    taskId =
+        new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
+    createTask();
+
+    // mimic register task
+    // create the tip
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+  }
+
+  private void startTracker() throws IOException {
+    // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
-    tracker.setIndexCache(new IndexCache(trackerFConf));
     tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf));
+    initializeTracker();
+  }
+
+  private void initializeTracker() throws IOException {
+    tracker.setIndexCache(new IndexCache(trackerFConf));
     tracker.setTaskMemoryManagerEnabledFlag();
 
     // for test case system FS is the local FS
     tracker.systemFS = FileSystem.getLocal(trackerFConf);
     tracker.setLocalFileSystem(tracker.systemFS);
     tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
-    
-    taskTrackerUGI = UserGroupInformation.getCurrentUser();
 
-    // Set up the task to be localized
-    taskId =
-        new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
-    task =
-        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
-    task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
-    task.setUser(UserGroupInformation.getCurrentUser().getUserName());
+    tracker.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    tracker.runningJobs = new TreeMap<JobID, RunningJob>();
+    tracker.setAsyncDiskService(new MRAsyncDiskService(trackerFConf));
+    tracker.getAsyncDiskService().cleanupAllVolumes();
 
-    taskController = new DefaultTaskController();
+    // setup task controller
+    taskController = createTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
-
     tracker.setTaskController(taskController);
     tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
+  }
 
-    // mimic register task
-    // create the tip
-    tip = tracker.new TaskInProgress(task, trackerFConf);
+  protected TaskController createTaskController() {
+    return new DefaultTaskController();
+  }
+
+  private void createTask()
+      throws IOException {
+    task = new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+    task.setConf(jobConf); // Set conf. Set user name in particular.
+    task.setUser(jobConf.getUser());
+  }
+
+  protected UserGroupInformation getJobOwner() throws IOException {
+    return UserGroupInformation.getCurrentUser();
   }
 
   /**
@@ -509,7 +540,12 @@ public class TestTaskTrackerLocalization
     }
     TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
     localizedJobConf = rjob.getJobConf();
+    initializeTask();
 
+    checkTaskLocalization();
+  }
+
+  private void initializeTask() throws IOException {
     tip.setJobConf(localizedJobConf);
 
     // ////////// The central method being tested
@@ -520,7 +556,7 @@ public class TestTaskTrackerLocalization
     for (String dir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
       File attemptDir =
           new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
-              .toString(), taskId.toString()));
+              .toString(), taskId.toString(), task.isTaskCleanupTask()));
       assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
           + " is not created!!", attemptDir.exists());
     }
@@ -534,6 +570,7 @@ public class TestTaskTrackerLocalization
         attemptWorkDir != null);
 
     TaskRunner runner = task.createRunner(tracker, tip);
+    tip.setTaskRunner(runner);
 
     // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
@@ -565,8 +602,6 @@ public class TestTaskTrackerLocalization
     // /////////// The method being tested
     taskController.initializeTask(taskContext);
     // ///////////
-
-    checkTaskLocalization();
   }
 
   protected void checkTaskLocalization()
@@ -576,13 +611,14 @@ public class TestTaskTrackerLocalization
         .getStrings(MRConfig.LOCAL_DIR)) {
       assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
           childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
-              .getUser(), jobId.toString(), taskId.toString(), false)));
+              .getUser(), jobId.toString(), taskId.toString(),
+              task.isTaskCleanupTask())));
     }
 
     // Make sure task task.getJobFile is changed and pointed correctly.
     assertTrue(task.getJobFile().endsWith(
         TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
-            .toString(), false)));
+            .toString(), task.isTaskCleanupTask())));
 
     // Make sure that the tmp directories are created
     assertTrue("tmp dir is not created in workDir "
@@ -745,38 +781,12 @@ public class TestTaskTrackerLocalization
   private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
       throws Exception {
     // Localize job and localize task.
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
     localizedJobConf = rjob.getJobConf();
     if (jvmReuse) {
       localizedJobConf.setNumTasksToExecutePerJvm(2);
     }
-    tip.setJobConf(localizedJobConf);
-    tip.localizeTask(task);
-    Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
-            task.getUser(), task.getJobID().toString(), task.getTaskID()
-                .toString(), task.isTaskCleanupTask()), trackerFConf);
-    TaskRunner runner = task.createRunner(tracker, tip);
-    tip.setTaskRunner(runner);
-    runner.setupChildTaskConfiguration(lDirAlloc);
-    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
-        localizedJobConf);
-    runner.prepareLogFiles(task.getTaskID());
-    Path localTaskFile =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
-            .getUser(), task.getJobID().toString(), task.getTaskID()
-            .toString(), task.isTaskCleanupTask()), trackerFConf);
-    JobConf localizedTaskConf = new JobConf(localTaskFile);
-    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
+    initializeTask();
 
     // TODO: Let the task run and create files.
 
@@ -890,4 +900,87 @@ public class TestTaskTrackerLocalization
     assertFalse("Job " + task.getJobID() + " work dir exists after cleanup", 
                 jWorkDirExists);
   }
+  
+  /**
+   * Tests TaskTracker restart after the localization.
+   * 
+   * This tests the following steps:
+   * 
+   * Localize Job, initialize a task.
+   * Then restart the Tracker.
+   * launch a cleanup attempt for the task.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void testTrackerRestart() throws IOException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    initializeTask();
+    
+    // imitate tracker restart
+    startTracker();
+    
+    // create a task cleanup attempt
+    createTask();
+    task.setTaskCleanupTask();
+    // register task
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+
+    // localize the job again.
+    rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    checkJobLocalization();
+    
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+  }
+  
+  /**
+   * Tests TaskTracker re-init after the localization.
+   * 
+   * This tests the following steps:
+   * 
+   * Localize Job, initialize a task.
+   * Then reinit the Tracker.
+   * launch a cleanup attempt for the task.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void testTrackerReinit() throws IOException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    initializeTask();
+    
+    // imitate tracker reinit
+    initializeTracker();
+    
+    // create a task cleanup attempt
+    createTask();
+    task.setTaskCleanupTask();
+    // register task
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+
+    // localize the job again.
+    rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    checkJobLocalization();
+    
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+  }
+
 }



Mime
View raw message