hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r803583 [3/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apa...
Date Wed, 12 Aug 2009 16:17:49 GMT
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
Wed Aug 12 16:17:47 2009
@@ -46,7 +46,7 @@
  * <li>Make the built binary to setuid executable</li>
  * <li>Execute following targets:
  * <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built
binary</em> 
- * -Dtaskcontroller-user=<em>user,group</em></code></li>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code></li>
  * </ol>
  * 
  */
@@ -81,6 +81,9 @@
 
   private static final int NUMBER_OF_NODES = 1;
 
+  static final String TASKCONTROLLER_PATH = "taskcontroller-path";
+  static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
+
   private File configurationFile = null;
 
   private UserGroupInformation taskControllerUser;
@@ -97,18 +100,20 @@
         MyLinuxTaskController.class.getName());
     mrCluster =
         new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
-            .toString(), 1, null, null, conf);
+            .toString(), 4, null, null, conf);
 
     // Get the configured taskcontroller-path
-    String path = System.getProperty("taskcontroller-path");
-    createTaskControllerConf(path);
+    String path = System.getProperty(TASKCONTROLLER_PATH);
+    configurationFile =
+        createTaskControllerConf(path, mrCluster.getTaskTrackerRunner(0)
+            .getLocalDirs());
     String execPath = path + "/task-controller";
     TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
     // TypeCasting the parent to our TaskController instance as we
     // know that that would be instance which should be present in TT.
     ((MyLinuxTaskController) tracker.getTaskController())
         .setTaskControllerExe(execPath);
-    String ugi = System.getProperty("taskcontroller-user");
+    String ugi = System.getProperty(TASKCONTROLLER_UGI);
     clusterConf = mrCluster.createJobConf();
     String[] splits = ugi.split(",");
     taskControllerUser = new UnixUserGroupInformation(splits);
@@ -133,21 +138,39 @@
         taskControllerUser.getGroupNames()[0]);
   }
 
-  private void createTaskControllerConf(String path)
+  /**
+   * Create taskcontroller.cfg.
+   * 
+   * @param path Path to the taskcontroller binary.
+   * @param localDirs
+   * @return the created conf file
+   * @throws IOException
+   */
+  static File createTaskControllerConf(String path, String[] localDirs)
       throws IOException {
     File confDirectory = new File(path, "../conf");
     if (!confDirectory.exists()) {
       confDirectory.mkdirs();
     }
-    configurationFile = new File(confDirectory, "taskcontroller.cfg");
+    File configurationFile = new File(confDirectory, "taskcontroller.cfg");
     PrintWriter writer =
         new PrintWriter(new FileOutputStream(configurationFile));
 
-    writer.println(String.format("mapred.local.dir=%s", mrCluster
-        .getTaskTrackerLocalDir(0)));
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < localDirs.length; i++) {
+      sb.append(localDirs[i]);
+      if ((i + 1) != localDirs.length) {
+        sb.append(",");
+      }
+    }
+    writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+
+    writer
+        .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
 
     writer.flush();
     writer.close();
+    return configurationFile;
   }
 
   /**
@@ -155,28 +178,35 @@
    * 
    * @return boolean
    */
-  protected boolean shouldRun() {
-    return isTaskExecPathPassed() && isUserPassed();
+  protected static boolean shouldRun() {
+    if (!isTaskExecPathPassed() || !isUserPassed()) {
+      LOG.info("Not running test.");
+      return false;
+    }
+    return true;
   }
 
-  private boolean isTaskExecPathPassed() {
-    String path = System.getProperty("taskcontroller-path");
+  private static boolean isTaskExecPathPassed() {
+    String path = System.getProperty(TASKCONTROLLER_PATH);
     if (path == null || path.isEmpty()
-        || path.equals("${taskcontroller-path}")) {
+        || path.equals("${" + TASKCONTROLLER_PATH + "}")) {
+      LOG.info("Invalid taskcontroller-path : " + path); 
       return false;
     }
     return true;
   }
 
-  private boolean isUserPassed() {
-    String ugi = System.getProperty("taskcontroller-user");
-    if (ugi != null && !(ugi.equals("${taskcontroller-user}"))
+  private static boolean isUserPassed() {
+    String ugi = System.getProperty(TASKCONTROLLER_UGI);
+    if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
         && !ugi.isEmpty()) {
       if (ugi.indexOf(",") > 1) {
         return true;
       }
+      LOG.info("Invalid taskcontroller-ugi : " + ugi); 
       return false;
     }
+    LOG.info("Invalid taskcontroller-ugi : " + ugi);
     return false;
   }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Wed
Aug 12 16:17:47 2009
@@ -171,7 +171,7 @@
       StringBuffer localPath = new StringBuffer();
       for(int i=0; i < numDir; ++i) {
         File ttDir = new File(localDirBase, 
-                              Integer.toString(trackerId) + "_" + 0);
+                              Integer.toString(trackerId) + "_" + i);
         if (!ttDir.mkdirs()) {
           if (!ttDir.isDirectory()) {
             throw new IOException("Mkdirs failed to create " + ttDir);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
Wed Aug 12 16:17:47 2009
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -96,23 +97,20 @@
     });
     return files.length;
   }
-  
+
   private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
       throws IOException {
-    String[] localDirs = conf.getLocalDirs();
-    assertEquals(1, localDirs.length);
-    Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
-        "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);    
-    Path attemptDir = new Path(jobCacheDir,
-        new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString());    
-    return new Path(attemptDir, "job.xml");
+    String taskid =
+        new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
+    return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+        TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
   }
 
   public void testIsolationRunOfMapTask() throws 
       IOException, InterruptedException, ClassNotFoundException {
     MiniMRCluster mr = null;
     try {
-      mr = new MiniMRCluster(1, "file:///", 1);
+      mr = new MiniMRCluster(1, "file:///", 4);
 
       // Run a job succesfully; keep task files.
       JobConf conf = mr.createJobConf();
@@ -130,8 +128,10 @@
       // Retrieve succesful job's configuration and 
       // run IsolationRunner against the map task.
       FileSystem localFs = FileSystem.getLocal(conf);
-      Path mapJobXml = getAttemptJobXml(conf, jobId,
-          TaskType.MAP).makeQualified(localFs);
+      Path mapJobXml =
+          getAttemptJobXml(
+              mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId,
+              TaskType.MAP).makeQualified(localFs);
       assertTrue(localFs.exists(mapJobXml));
       
       new IsolationRunner().run(new String[] {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
Wed Aug 12 16:17:47 2009
@@ -20,8 +20,10 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Test a java-based mapred job with LinuxTaskController running the jobs as a
@@ -39,10 +41,32 @@
     startCluster();
     Path inDir = new Path("input");
     Path outDir = new Path("output");
-    RunningJob job =
-        UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir);
+
+    RunningJob job;
+
+    // Run a job with zero maps/reduces
+    job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+    job.waitForCompletion();
+    assertTrue("Job failed", job.isSuccessful());
+    assertOwnerShip(outDir);
+
+    // Run a job with 1 map and zero reduces
+    job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+    job.waitForCompletion();
     assertTrue("Job failed", job.isSuccessful());
     assertOwnerShip(outDir);
+
+    // Run a normal job with maps/reduces
+    job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+    job.waitForCompletion();
+    assertTrue("Job failed", job.isSuccessful());
+    assertOwnerShip(outDir);
+
+    // Run a job with jvm reuse
+    JobConf myConf = getClusterConf();
+    myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+    String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+    assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
   }
   
   public void testEnvironment() throws IOException {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
Wed Aug 12 16:17:47 2009
@@ -351,15 +351,26 @@
     if (ProcessTree.isSetsidAvailable) {
       FileSystem fs = FileSystem.getLocal(conf);
 
-      if(fs.exists(scriptDir)){
+      if (fs.exists(scriptDir)) {
         fs.delete(scriptDir, true);
       }
-      // create shell script
-      Random rm = new Random();
+
+      // Create the directory and set open permissions so that the TT can
+      // access.
+      fs.mkdirs(scriptDir);
+      fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL,
+          FsAction.ALL));
+
+     // create shell script
+     Random rm = new Random();
       Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt()
         + ".sh");
       String shellScript = scriptPath.toString();
+
+      // Construct the script. Set umask to 0000 so that TT can access all the
+      // files.
       String script =
+        "umask 000\n" + 
         "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
         "echo hello\n" +
         "trap 'echo got SIGTERM' 15 \n" +
@@ -374,7 +385,10 @@
       file.writeBytes(script);
       file.close();
 
-      LOG.info("Calling script from map task of failjob : " + shellScript);
+      // Set executable permissions on the script.
+      new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+      LOG.info("Calling script from map task : " + shellScript);
       Runtime.getRuntime()
           .exec(shellScript + " " + numLevelsOfSubProcesses);
     

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=803583&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
Wed Aug 12 16:17:47 2009
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker when {@link LinuxTaskController} is used.
+ * 
+ */
+public class TestLocalizationWithLinuxTaskController extends
+    TestTaskTrackerLocalization {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
+
+  private File configFile;
+  private MyLinuxTaskController taskController;
+
+  @Override
+  protected void setUp()
+      throws Exception {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    super.setUp();
+
+    taskController = new MyLinuxTaskController();
+    String path =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+        ClusterWithLinuxTaskController.createTaskControllerConf(path,
+            localDirs);
+    String execPath = path + "/task-controller";
+    taskController.setTaskControllerExe(execPath);
+    taskController.setConf(trackerFConf);
+    taskController.setup();
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    super.tearDown();
+    if (configFile != null) {
+      configFile.delete();
+    }
+  }
+
+  /** @InheritDoc */
+  @Override
+  public void testTaskControllerSetup() {
+    // Do nothing.
+  }
+
+  /**
+   * Test job localization with {@link LinuxTaskController}. Also check the
+   * permissions and file ownership of the job related files.
+   */
+  @Override
+  public void testJobLocalization()
+      throws IOException,
+      LoginException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    // Do job localization
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    localizedJobConf.setUser(ugi.split(",")[0]);
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext context = new JobInitializationContext();
+    context.jobid = jobId;
+    context.user = localizedJobConf.getUser();
+    context.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+    // /////////// The method being tested
+    taskController.initializeJob(context);
+    // ///////////
+
+    UserGroupInformation taskTrackerugi =
+        UserGroupInformation.login(localizedJobConf);
+    for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
+      File jobDir =
+          new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+      // check the private permissions on the job directory
+      checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+
+    // check the private permissions of various directories
+    List<Path> dirs = new ArrayList<Path>();
+    Path jarsDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
+            .toString()), trackerFConf);
+    dirs.add(jarsDir);
+    dirs.add(new Path(jarsDir, "lib"));
+    for (Path dir : dirs) {
+      checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+
+    // job-work dir needs user writable permissions
+    Path jobWorkDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
+            .toString()), trackerFConf);
+    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
+        localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+
+    // check the private permissions of various files
+    List<Path> files = new ArrayList<Path>();
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker
+        .getLocalJobConfFile(jobId.toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+        .toString()), trackerFConf));
+    files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
+    files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
+    for (Path file : files) {
+      checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+  }
+
+  /**
+   * Test task localization with {@link LinuxTaskController}. Also check the
+   * permissions and file ownership of task related files.
+   */
+  @Override
+  public void testTaskLocalization()
+      throws IOException,
+      LoginException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    localizedJobConf.setUser(ugi.split(",")[0]);
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext jobContext = new JobInitializationContext();
+    jobContext.jobid = jobId;
+    jobContext.user = localizedJobConf.getUser();
+    jobContext.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    taskController.initializeJob(jobContext);
+
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    tip.setJobConf(localizedJobConf);
+
+    // localize the task.
+    tip.localizeTask(task);
+    TaskRunner runner = task.createRunner(tracker, tip);
+    runner.setupChildTaskConfiguration(lDirAlloc);
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+            .getJobID().toString(), task.getTaskID().toString(), task
+            .isTaskCleanupTask()), trackerFConf);
+    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+        localizedJobConf);
+    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+
+    // Initialize task
+    TaskControllerContext taskContext =
+        new TaskController.TaskControllerContext();
+    taskContext.env =
+        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+    taskContext.task = task;
+    // /////////// The method being tested
+    taskController.initializeTask(taskContext);
+    // ///////////
+
+    // check the private permissions of various directories
+    List<Path> dirs = new ArrayList<Path>();
+    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
+        .toString(), taskId.toString()), trackerFConf));
+    dirs.add(workDir);
+    dirs.add(new Path(workDir, "tmp"));
+    dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
+    UserGroupInformation taskTrackerugi =
+        UserGroupInformation.login(localizedJobConf);
+    for (Path dir : dirs) {
+      checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+
+    // check the private permissions of various files
+    List<Path> files = new ArrayList<Path>();
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+        .getJobID().toString(), task.getTaskID().toString(), task
+        .isTaskCleanupTask()), trackerFConf));
+    for (Path file : files) {
+      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Wed Aug
12 16:17:47 2009
@@ -272,14 +272,12 @@
   private static class MyReduce extends IdentityReducer {
     private JobConf conf;
     private boolean compressInput;
-    private TaskAttemptID taskId;
     private boolean first = true;
       
     @Override
     public void configure(JobConf conf) {
       this.conf = conf;
       compressInput = conf.getCompressMapOutput();
-      taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
     }
       
     public void reduce(WritableComparable key, Iterator values,
@@ -287,9 +285,9 @@
                        ) throws IOException {
       if (first) {
         first = false;
-        MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
+        MapOutputFile mapOutputFile = new MapOutputFile();
         mapOutputFile.setConf(conf);
-        Path input = mapOutputFile.getInputFile(0, taskId);
+        Path input = mapOutputFile.getInputFile(0);
         FileSystem fs = FileSystem.get(conf);
         assertTrue("reduce input exists " + input, fs.exists(input));
         SequenceFile.Reader rdr = 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Wed Aug 12 16:17:47 2009
@@ -135,7 +135,7 @@
       int numNotDel = 0;
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
       LOG.debug("Tracker directory: " + localDir);
-      File trackerDir = new File(localDir, "taskTracker");
+      File trackerDir = new File(localDir, TaskTracker.SUBDIR);
       assertTrue("local dir " + localDir + " does not exist.", 
                  localDir.isDirectory());
       assertTrue("task tracker dir " + trackerDir + " does not exist.", 
@@ -150,7 +150,7 @@
       }
       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
         String name = contents[fileIdx];
-        if (!("taskTracker".equals(contents[fileIdx]))) {
+        if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
           LOG.debug("Looking at " + name);
           assertTrue("Spurious directory " + name + " found in " +
                      localDir, false);
@@ -158,7 +158,7 @@
       }
       for (int idx = 0; idx < neededDirs.size(); ++idx) {
         String name = neededDirs.get(idx);
-        if (new File(new File(new File(trackerDir, "jobcache"),
+        if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
                               jobIds[idx]), name).isDirectory()) {
           found[idx] = true;
           numNotDel++;

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=803583&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Wed Aug 12 16:17:47 2009
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
+import junit.framework.TestCase;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker.
+ * 
+ */
+public class TestTaskTrackerLocalization extends TestCase {
+
+  private File TEST_ROOT_DIR;
+  private File ROOT_MAPRED_LOCAL_DIR;
+  private File HADOOP_LOG_DIR;
+
+  private int numLocalDirs = 6;
+  private static final Log LOG =
+      LogFactory.getLog(TestTaskTrackerLocalization.class);
+
+  protected TaskTracker tracker;
+  protected JobConf trackerFConf;
+  protected JobID jobId;
+  protected TaskAttemptID taskId;
+  protected Task task;
+  protected String[] localDirs;
+  protected static LocalDirAllocator lDirAlloc =
+      new LocalDirAllocator("mapred.local.dir");
+
+  @Override
+  protected void setUp()
+      throws Exception {
+    TEST_ROOT_DIR =
+        new File(System.getProperty("test.build.data", "/tmp"),
+            "testTaskTrackerLocalization");
+    if (!TEST_ROOT_DIR.exists()) {
+      TEST_ROOT_DIR.mkdirs();
+    }
+
+    ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
+    ROOT_MAPRED_LOCAL_DIR.mkdirs();
+
+    HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs");
+    HADOOP_LOG_DIR.mkdir();
+    System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
+
+    trackerFConf = new JobConf();
+    trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    localDirs = new String[numLocalDirs];
+    for (int i = 0; i < numLocalDirs; i++) {
+      localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
+    }
+    trackerFConf.setStrings("mapred.local.dir", localDirs);
+
+    // Create the job jar file
+    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+    JarOutputStream jstream =
+        new JarOutputStream(new FileOutputStream(jobJarFile));
+    ZipEntry ze = new ZipEntry("lib/lib1.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    ze = new ZipEntry("lib/lib2.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    jstream.finish();
+    jstream.close();
+    trackerFConf.setJar(jobJarFile.toURI().toString());
+
+    // Create the job configuration file
+    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+    FileOutputStream out = new FileOutputStream(jobConfFile);
+    trackerFConf.writeXml(out);
+    out.close();
+
+    // Set up the TaskTracker
+    tracker = new TaskTracker();
+    tracker.setConf(trackerFConf);
+    tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+    // Set up the task to be localized
+    String jtIdentifier = "200907202331";
+    jobId = new JobID(jtIdentifier, 1);
+    taskId =
+        new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
+    task =
+        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(trackerFConf);
+    taskController.setup();
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    FileUtil.fullyDelete(TEST_ROOT_DIR);
+  }
+
+  private static String[] getFilePermissionAttrs(String path)
+      throws IOException {
+    String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
+    return output.split(":|\n");
+  }
+
+  static void checkFilePermissions(String path, String expectedPermissions,
+      String expectedOwnerUser, String expectedOwnerGroup)
+      throws IOException {
+    String[] attrs = getFilePermissionAttrs(path);
+    assertTrue("File attrs length is not 3 but " + attrs.length,
+        attrs.length == 3);
+    assertTrue("Path " + path + " has the permissions " + attrs[0]
+        + " instead of the expected " + expectedPermissions, attrs[0]
+        .equals(expectedPermissions));
+    assertTrue("Path " + path + " is not user owned not by "
+        + expectedOwnerUser + " but by " + attrs[1], attrs[1]
+        .equals(expectedOwnerUser));
+    assertTrue("Path " + path + " is not group owned not by "
+        + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
+        .equals(expectedOwnerGroup));
+  }
+
+  /**
+   * Verify the task-controller's setup functionality
+   * 
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testTaskControllerSetup()
+      throws IOException,
+      LoginException {
+    // Task-controller is already set up in the test's setup method. Now verify.
+    UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
+    for (String localDir : localDirs) {
+
+      // Verify the local-dir itself.
+      File lDir = new File(localDir);
+      assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
+      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
+          .getUserName(), ugi.getGroupNames()[0]);
+
+      // Verify the distributed cache dir.
+      File distributedCacheDir =
+          new File(localDir, TaskTracker.getDistributedCacheDir());
+      assertTrue("distributed cache dir " + distributedCacheDir
+          + " doesn't exists!", distributedCacheDir.exists());
+      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+          "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
+
+      // Verify the job cache dir.
+      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+      assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
+          jobCacheDir.exists());
+      checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
+          .getUserName(), ugi.getGroupNames()[0]);
+    }
+
+    // Verify the pemissions on the userlogs dir
+    File taskLog = TaskLog.getUserLogDir();
+    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
+        .getUserName(), ugi.getGroupNames()[0]);
+  }
+
+  /**
+   * Test job localization on a TT. Tests localization of job.xml, job.jar and
+   * corresponding setting of configuration.
+   * 
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testJobLocalization()
+      throws IOException,
+      LoginException {
+
+    // /////////// The main method being tested
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    // ///////////
+
+    // Check the directory structure
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      assertTrue("mapred.local.dir " + localDir + " isn'task created!",
+          localDir.exists());
+
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      assertTrue("taskTracker sub-dir in the local-dir " + localDir
+          + "is not created!", taskTrackerSubDir.exists());
+
+      File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
+          + " isn'task created!", jobCache.exists());
+
+      File jobDir = new File(jobCache, jobId.toString());
+      assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
+          .exists());
+
+      // check the private permissions on the job directory
+      UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
+          .getUserName(), ugi.getGroupNames()[0]);
+    }
+
+    // check the localization of job.xml
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+    assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
+        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
+            trackerFConf) != null);
+
+    // check the localization of job.jar
+    Path jarFileLocalized =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+            .toString()), trackerFConf);
+    assertTrue("job.jar is not localized on this TaskTracker!!",
+        jarFileLocalized != null);
+    assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
+        jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
+        .exists());
+    assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
+        jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
+        .exists());
+
+    // check the creation of job work directory
+    assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
+        .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
+            trackerFConf) != null);
+
+    // Check the setting of job.local.dir and job.jar which will eventually be
+    // used by the user's task
+    boolean jobLocalDirFlag = false, mapredJarFlag = false;
+    String localizedJobLocalDir =
+        localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
+    String localizedJobJar = localizedJobConf.getJar();
+    for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+      if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
+          + TaskTracker.getJobWorkDir(jobId.toString()))) {
+        jobLocalDirFlag = true;
+      }
+      if (localizedJobJar.equals(localDir + Path.SEPARATOR
+          + TaskTracker.getJobJarFile(jobId.toString()))) {
+        mapredJarFlag = true;
+      }
+    }
+    assertTrue(TaskTracker.JOB_LOCAL_DIR
+        + " is not set properly to the target users directory : "
+        + localizedJobLocalDir, jobLocalDirFlag);
+    assertTrue(
+        "mapred.jar is not set properly to the target users directory : "
+            + localizedJobJar, mapredJarFlag);
+  }
+
+  /**
+   * Test task localization on a TT.
+   * 
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testTaskLocalization()
+      throws IOException,
+      LoginException {
+
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    tip.setJobConf(localizedJobConf);
+
+    // ////////// The central method being tested
+    tip.localizeTask(task);
+    // //////////
+
+    // check the functionality of localizeTask
+    for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
+      assertTrue("attempt-dir in localDir " + dir + " is not created!!",
+          new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
+              .toString())).exists());
+    }
+
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+            .getJobID().toString(), task.getTaskID().toString(), task
+            .isTaskCleanupTask()), trackerFConf);
+    assertTrue("atttempt work dir for " + taskId.toString()
+        + " is not created in any of the configured dirs!!", workDir != null);
+
+    TaskRunner runner = task.createRunner(tracker, tip);
+
+    // /////// Few more methods being tested
+    runner.setupChildTaskConfiguration(lDirAlloc);
+    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+        localizedJobConf);
+    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+    // ///////
+
+    // Make sure the task-conf file is created
+    Path localTaskFile =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+            .getJobID().toString(), task.getTaskID().toString(), task
+            .isTaskCleanupTask()), trackerFConf);
+    assertTrue("Task conf file " + localTaskFile.toString()
+        + " is not created!!", new File(localTaskFile.toUri().getPath())
+        .exists());
+
+    // /////// One more method being tested. This happens in child space.
+    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+    // ///////
+
+    // Make sure that the mapred.local.dir is sandboxed
+    for (String childMapredLocalDir : localizedTaskConf
+        .getStrings("mapred.local.dir")) {
+      assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
+          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
+              .toString(), taskId.toString(), false)));
+    }
+
+    // Make sure task task.getJobFile is changed and pointed correctly.
+    assertTrue(task.getJobFile().endsWith(
+        TaskTracker
+            .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+
+    // Make sure that the tmp directories are created
+    assertTrue("tmp dir is not created in workDir "
+        + workDir.toUri().getPath(),
+        new File(workDir.toUri().getPath(), "tmp").exists());
+
+    // Make sure that the log are setup properly
+    File logDir =
+        new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
+            + task.getTaskID().toString());
+    assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
+        logDir.exists());
+    UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
+        .getUserName(), ugi.getGroupNames()[0]);
+
+    File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
+    assertTrue("stdout log file is improper. Expected : "
+        + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
+        expectedStdout.toString().equals(logFiles[0].toString()));
+    File expectedStderr =
+        new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
+    assertTrue("stderr log file is improper. Expected : "
+        + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
+        expectedStderr.toString().equals(logFiles[1].toString()));
+  }
+}



Mime
View raw message