hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1346206 [2/3] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/tests/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/map...
Date Tue, 05 Jun 2012 01:02:17 GMT
Added: hadoop/common/branches/branch-0.22/mapreduce/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/c++/task-controller/tests/test-task-controller.c (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/c++/task-controller/tests/test-task-controller.c Tue Jun  5 01:02:16 2012
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../task-controller.h"
+
+#define HADOOP_CONF_DIR "/tmp"
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  char const *str =
+      "mapreduce.cluster.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fwrite(str, 1, strlen(str), file);
+  fclose(file);
+  return 0;
+}
+
+void test_check_variable_against_config() {
+
+  // A temporary configuration directory
+  char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
+
+  // To accomodate "/conf/taskcontroller.cfg"
+  char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
+
+  strcpy(template, conf_dir_templ);
+  char *temp_dir = mkdtemp(template);
+  if (temp_dir == NULL) {
+    printf("Couldn't create a temporary dir for conf.\n");
+    goto cleanup;
+  }
+
+  // Set the configuration directory
+  hadoop_conf_dir = strdup(temp_dir);
+
+  // create the configuration directory
+  strcat(template, "/conf");
+  char *conf_dir = strdup(template);
+  mkdir(conf_dir, S_IRWXU);
+
+  // create the configuration file
+  strcat(template, "/taskcontroller.cfg");
+  if (write_config_file(template) != 0) {
+    printf("Couldn't write the configuration file.\n");
+    goto cleanup;
+  }
+
+  // Test obtaining a value for a key from the config
+  char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
+      "/tmp/testing3", "/tmp/testing4" };
+  char *value = (char *) get_value("mapreduce.cluster.local.dir");
+  if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
+      != 0) {
+    printf("Obtaining a value for a key from the config failed.\n");
+    goto cleanup;
+  }
+
+  // Test the parsing of a multiple valued key from the config
+  char **values = (char **)get_values("mapreduce.cluster.local.dir");
+  char **values_ptr = values;
+  int i = 0;
+  while (*values_ptr != NULL) {
+    printf(" value : %s\n", *values_ptr);
+    if (strcmp(*values_ptr, config_values[i++]) != 0) {
+      printf("Configured values are not read out properly. Test failed!");
+      goto cleanup;;
+    }
+    values_ptr++;
+  }
+
+  if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing5") == 0) {
+    printf("Configuration should not contain /tmp/testing5! \n");
+    goto cleanup;
+  }
+
+  if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing4") != 0) {
+    printf("Configuration should contain /tmp/testing4! \n");
+    goto cleanup;
+  }
+
+  cleanup: if (value != NULL) {
+    free(value);
+  }
+  if (values != NULL) {
+    free(values);
+  }
+  if (hadoop_conf_dir != NULL) {
+    free(hadoop_conf_dir);
+  }
+  unlink(template);
+  rmdir(conf_dir);
+  rmdir(hadoop_conf_dir);
+}
+
+void test_get_user_directory() {
+  char *user_dir = (char *) get_user_directory("/tmp", "user");
+  printf("user_dir obtained is %s\n", user_dir);
+  int ret = 0;
+  if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) {
+    ret = -1;
+  }
+  free(user_dir);
+  assert(ret == 0);
+}
+
+void test_get_job_directory() {
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  printf("job_dir obtained is %s\n", job_dir);
+  int ret = 0;
+  if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001")
+      != 0) {
+    ret = -1;
+  }
+  free(job_dir);
+  assert(ret == 0);
+}
+
+void test_get_attempt_directory() {
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  printf("job_dir obtained is %s\n", job_dir);
+  char *attempt_dir = (char *) get_attempt_directory(job_dir,
+      "attempt_200906101234_0001_m_000000_0");
+  printf("attempt_dir obtained is %s\n", attempt_dir);
+  int ret = 0;
+  if (strcmp(
+      attempt_dir,
+      "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0")
+      != 0) {
+    ret = -1;
+  }
+  free(job_dir);
+  free(attempt_dir);
+  assert(ret == 0);
+}
+
+void test_get_task_launcher_file() {
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  char *task_file = (char *) get_task_launcher_file(job_dir,
+      "attempt_200906112028_0001_m_000000_0");
+  printf("task_file obtained is %s\n", task_file);
+  int ret = 0;
+  if (strcmp(
+      task_file,
+      "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+      != 0) {
+    ret = -1;
+  }
+  free(task_file);
+  assert(ret == 0);
+}
+
+void test_get_job_log_dir() {
+  char *logdir = (char *) get_job_log_dir("/tmp/testing",
+    "job_200906101234_0001");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
+void test_get_job_acls_file() {
+  char *job_acls_file = (char *) get_job_acls_file(
+    "/tmp/testing/userlogs/job_200906101234_0001");
+  printf("job acls file obtained is %s\n", job_acls_file);
+  int ret = 0;
+  if (strcmp(job_acls_file,
+    "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
+    ret = -1;
+  }
+  free(job_acls_file);
+  assert(ret == 0);
+}
+
+void test_get_task_log_dir() {
+  char *logdir = (char *) get_task_log_dir("/tmp/testing",
+    "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir,
+      "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
+int main(int argc, char **argv) {
+  printf("\nStarting tests\n");
+  LOGFILE = stdout;
+
+  printf("\nTesting check_variable_against_config()\n");
+  test_check_variable_against_config();
+
+  printf("\nTesting get_user_directory()\n");
+  test_get_user_directory();
+
+  printf("\nTesting get_job_directory()\n");
+  test_get_job_directory();
+
+  printf("\nTesting get_attempt_directory()\n");
+  test_get_attempt_directory();
+
+  printf("\nTesting get_task_launcher_file()\n");
+  test_get_task_launcher_file();
+
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
+  printf("\nTesting get_job_acls_file()\n");
+  test_get_job_acls_file();
+
+  printf("\nTesting get_task_log_dir()\n");
+  test_get_task_log_dir();
+
+  printf("\nFinished tests\n");
+  return 0;
+}

Added: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,657 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * A {@link TaskController} that runs the task JVMs as the user 
+ * who submits the job.
+ * 
+ * This class executes a setuid executable to implement methods
+ * of the {@link TaskController}, including launching the task 
+ * JVM and killing it when needed, and also initializing and
+ * finalizing the task environment. 
+ * <p> The setuid executable is launched using the command line:</p>
+ * <p>task-controller mapreduce.job.user.name command command-args, where</p>
+ * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
+ * <p>command is one of the cardinal value of the 
+ * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
+ * <p>command-args depends on the command being launched.</p>
+ * 
+ * In addition to running and killing tasks, the class also 
+ * sets up appropriate access for the directories and files 
+ * that will be used by the tasks. 
+ */
+class LinuxTaskController extends TaskController {
+
+  private static final Log LOG = 
+            LogFactory.getLog(LinuxTaskController.class);
+
+  // Name of the executable script that will contain the child
+  // JVM command line. See writeCommand for details.
+  private static final String COMMAND_FILE = "taskjvm.sh";
+  
+  // Path to the setuid executable.
+  private static String taskControllerExe;
+  
+  static {
+    // the task-controller is expected to be under the $HADOOP_HOME/bin
+    // directory.
+    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+    taskControllerExe = 
+        new File(hadoopBin, "task-controller").getAbsolutePath();
+  }
+  
+  public LinuxTaskController() {
+    super();
+  }
+  
+  /**
+   * List of commands that the setuid script will execute.
+   */
+  enum TaskControllerCommands {
+    INITIALIZE_USER,
+    INITIALIZE_JOB,
+    INITIALIZE_DISTRIBUTEDCACHE_FILE,
+    LAUNCH_TASK_JVM,
+    INITIALIZE_TASK,
+    TERMINATE_TASK_JVM,
+    KILL_TASK_JVM,
+    RUN_DEBUG_SCRIPT,
+    SIGQUIT_TASK_JVM,
+    ENABLE_TASK_FOR_CLEANUP,
+    ENABLE_JOB_FOR_CLEANUP
+  }
+
+  @Override
+  public void setup() throws IOException {
+    super.setup();
+    
+    // Check the permissions of the task-controller binary by running it plainly.
+    // If permissions are correct, it returns an error code 1, else it returns 
+    // 24 or something else if some other bugs are also present.
+    String[] taskControllerCmd =
+        new String[] { getTaskControllerExecutablePath() };
+    ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
+    try {
+      shExec.execute();
+    } catch (ExitCodeException e) {
+      int exitCode = shExec.getExitCode();
+      if (exitCode != 1) {
+        LOG.warn("Exit code from checking binary permissions is : " + exitCode);
+        logOutput(shExec.getOutput());
+        throw new IOException("Task controller setup failed because of invalid"
+          + "permissions/ownership with exit code " + exitCode, e);
+      }
+    }
+  }
+
+  /**
+   * Launch a task JVM that will run as the owner of the job.
+   * 
+   * This method launches a task JVM by executing a setuid executable that will
+   * switch to the user and run the task. Also does initialization of the first
+   * task in the same setuid process launch.
+   */
+  @Override
+  void launchTaskJVM(TaskController.TaskControllerContext context) 
+                                        throws IOException {
+    JvmEnv env = context.env;
+    // get the JVM command line.
+    String cmdLine = 
+      TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
+          env.logSize, true);
+
+    StringBuffer sb = new StringBuffer();
+    //export out all the environment variable before child command as
+    //the setuid/setgid binaries would not be getting, any environmental
+    //variables which begin with LD_*.
+    for(Entry<String, String> entry : env.env.entrySet()) {
+      sb.append("export ");
+      sb.append(entry.getKey());
+      sb.append("=");
+      sb.append(entry.getValue());
+      sb.append("\n");
+    }
+    sb.append(cmdLine);
+    // write the command to a file in the
+    // task specific cache directory
+    writeCommand(sb.toString(), getTaskCacheDirectory(context, 
+        context.env.workDir));
+    
+    // Call the taskcontroller with the right parameters.
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, 
+        context.env.workDir);
+    ShellCommandExecutor shExec =  buildTaskControllerExecutor(
+                                    TaskControllerCommands.LAUNCH_TASK_JVM, 
+                                    env.conf.getUser(),
+                                    launchTaskJVMArgs, env.workDir, env.env);
+    context.shExec = shExec;
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+      // terminated/killed forcefully. In all other cases, log the
+      // task-controller output
+      if (exitCode != 143 && exitCode != 137) {
+        LOG.warn("Exception thrown while launching task JVM : "
+            + StringUtils.stringifyException(e));
+        LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+        logOutput(shExec.getOutput());
+      }
+      throw new IOException(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+      logOutput(shExec.getOutput());
+    }
+  }
+  
+  /**
+   * Launch the debug script process that will run as the owner of the job.
+   * 
+   * This method launches the task debug script process by executing a setuid
+   * executable that will switch to the user and run the task. 
+   */
+  @Override
+  void runDebugScript(DebugScriptContext context) throws IOException {
+    String debugOut = FileUtil.makeShellPath(context.stdout);
+    String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
+    writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
+    // Call the taskcontroller with the right parameters.
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
+    runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
+        launchTaskJVMArgs, context.workDir, null);
+  }
+  /**
+   * Helper method that runs a LinuxTaskController command
+   * 
+   * @param taskControllerCommand
+   * @param user
+   * @param cmdArgs
+   * @param env
+   * @throws IOException
+   */
+  private void runCommand(TaskControllerCommands taskControllerCommand, 
+      String user, List<String> cmdArgs, File workDir, Map<String, String> env)
+      throws IOException {
+
+    ShellCommandExecutor shExec =
+        buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, 
+                                    workDir, env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
+          + shExec.getExitCode());
+      LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
+          + StringUtils.stringifyException(e));
+      LOG.info("Output from LinuxTaskController's " 
+               + taskControllerCommand.toString() + " follows:");
+      logOutput(shExec.getOutput());
+      throw new IOException(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's " 
+               + taskControllerCommand.toString() + " follows:");
+      logOutput(shExec.getOutput());
+    }
+  }
+
+  /**
+   * Returns list of arguments to be passed while initializing a new task. See
+   * {@code buildTaskControllerExecutor(TaskControllerCommands, String, 
+   * List<String>, JvmEnv)} documentation.
+   * 
+   * @param context
+   * @return Argument to be used while launching Task VM
+   */
+  private List<String> buildInitializeTaskArgs(TaskExecContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    String taskId = context.task.getTaskID().toString();
+    String jobId = getJobId(context);
+    commandArgs.add(jobId);
+    if (!context.task.isTaskCleanupTask()) {
+      commandArgs.add(taskId);
+    } else {
+      commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+    }
+    return commandArgs;
+  }
+
+  @Override
+  void initializeTask(TaskControllerContext context)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " 
+                + TaskControllerCommands.INITIALIZE_TASK.toString()
+                + " for " + context.task.getTaskID().toString());
+    }
+    runCommand(TaskControllerCommands.INITIALIZE_TASK, 
+        context.env.conf.getUser(),
+        buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+  }
+
+  /**
+   * Builds the args to be passed to task-controller for enabling of task for
+   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
+   */
+  private List<String> buildTaskCleanupArgs(
+      TaskControllerTaskPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.task.getJobID().toString());
+
+    String workDir = "";
+    if (context.isWorkDir) {
+      workDir = "/work";
+    }
+    if (context.task.isTaskCleanupTask()) {
+      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+                      + workDir);
+    } else {
+      commandArgs.add(context.task.getTaskID() + workDir);
+    }
+
+    return commandArgs;
+  }
+
+  /**
+   * Builds the args to be passed to task-controller for enabling of job for
+   * cleanup. Last arg in this List is $jobid.
+   */
+  private List<String> buildJobCleanupArgs(
+      TaskControllerJobPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(2);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.jobId.toString());
+
+    return commandArgs;
+  }
+  
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (context instanceof TaskControllerTaskPathDeletionContext) {
+      TaskControllerTaskPathDeletionContext tContext =
+        (TaskControllerTaskPathDeletionContext) context;
+      enablePathForCleanup(tContext, 
+                           TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
+                           buildTaskCleanupArgs(tContext));
+    }
+    else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+          + "TaskControllerTaskPathDeletionContext.");
+    }
+  }
+
+  /**
+   * Enables the job for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableJobForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (context instanceof TaskControllerJobPathDeletionContext) {
+      TaskControllerJobPathDeletionContext tContext =
+        (TaskControllerJobPathDeletionContext) context;
+      enablePathForCleanup(tContext, 
+                           TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
+                           buildJobCleanupArgs(tContext));
+    } else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+                  + "TaskControllerJobPathDeletionContext.");
+    }
+  }
+  
+  /**
+   * Enable a path for cleanup
+   * @param c {@link TaskControllerPathDeletionContext} for the path to be 
+   *          cleaned up
+   * @param command {@link TaskControllerCommands} for task/job cleanup
+   * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable 
+   *                    path cleanup
+   */
+  private void enablePathForCleanup(TaskControllerPathDeletionContext c,
+                                    TaskControllerCommands command,
+                                    List<String> cleanupArgs) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
+    }
+
+    if ( c.user != null && c.fs instanceof LocalFileSystem) {
+      try {
+        runCommand(command, c.user, cleanupArgs, null, null);
+      } catch(IOException e) {
+        LOG.warn("Unable to change permissions for " + c.fullPath);
+      }
+    }
+    else {
+      throw new IllegalArgumentException("Either user is null or the " 
+                  + "file system is not local file system.");
+    }
+  }
+
+  private void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
+      }
+    }
+  }
+
+  private String getJobId(TaskExecContext context) {
+    String taskId = context.task.getTaskID().toString();
+    TaskAttemptID tId = TaskAttemptID.forName(taskId);
+    String jobId = tId.getJobID().toString();
+    return jobId;
+  }
+
+  /**
+   * Returns list of arguments to be passed while launching task VM.
+   * See {@code buildTaskControllerExecutor(TaskControllerCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * @param context
+   * @return Argument to be used while launching Task VM
+   */
+  private List<String> buildLaunchTaskArgs(TaskExecContext context, 
+      File workDir) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    LOG.debug("getting the task directory as: " 
+        + getTaskCacheDirectory(context, workDir));
+    LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context, workDir)), 
+        context) );
+    commandArgs.add(getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context, workDir)), 
+        context));
+    commandArgs.addAll(buildInitializeTaskArgs(context));
+    return commandArgs;
+  }
+
+  // Get the directory from the list of directories configured
+  // in Configs.LOCAL_DIR chosen for storing data pertaining to
+  // this task.
+  private String getDirectoryChosenForTask(File directory,
+      TaskExecContext context) {
+    String jobId = getJobId(context);
+    String taskId = context.task.getTaskID().toString();
+    for (String dir : mapredLocalDirs) {
+      File mapredDir = new File(dir);
+      File taskDir =
+          new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
+              .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
+              .getParentFile();
+      if (directory.equals(taskDir)) {
+        return dir;
+      }
+    }
+
+    LOG.error("Couldn't parse task cache directory correctly");
+    throw new IllegalArgumentException("invalid task cache directory "
+        + directory.getAbsolutePath());
+  }
+
+  /**
+   * Builds the command line for launching/terminating/killing task JVM.
+   * Following is the format for launching/terminating/killing task JVM
+   * <br/>
+   * For launching following is command line argument:
+   * <br/>
+   * {@code mapreduce.job.user.name command tt-root job_id task_id} 
+   * <br/>
+   * For terminating/killing task jvm.
+   * {@code mapreduce.job.user.name command tt-root task-pid}
+   * 
+   * @param command command to be executed.
+   * @param userName mapreduce.job.user.name
+   * @param cmdArgs list of extra arguments
+   * @param workDir working directory for the task-controller
+   * @param env JVM environment variables.
+   * @return {@link ShellCommandExecutor}
+   * @throws IOException
+   */
+  private ShellCommandExecutor buildTaskControllerExecutor(
+      TaskControllerCommands command, String userName, List<String> cmdArgs,
+      File workDir, Map<String, String> env)
+      throws IOException {
+    String[] taskControllerCmd = new String[3 + cmdArgs.size()];
+    taskControllerCmd[0] = getTaskControllerExecutablePath();
+    taskControllerCmd[1] = userName;
+    taskControllerCmd[2] = String.valueOf(command.ordinal());
+    int i = 3;
+    for (String cmdArg : cmdArgs) {
+      taskControllerCmd[i++] = cmdArg;
+    }
+    if (LOG.isDebugEnabled()) {
+      for (String cmd : taskControllerCmd) {
+        LOG.debug("taskctrl command = " + cmd);
+      }
+    }
+    ShellCommandExecutor shExec = null;
+    if(workDir != null && workDir.exists()) {
+      shExec = new ShellCommandExecutor(taskControllerCmd,
+          workDir, env);
+    } else {
+      shExec = new ShellCommandExecutor(taskControllerCmd);
+    }
+    
+    return shExec;
+  }
+  
+  // Return the task specific directory under the cache.
+  private String getTaskCacheDirectory(TaskExecContext context, 
+      File workDir) {
+    // In the case of JVM reuse, the task specific directory
+    // is different from what is set with respect with
+    // env.workDir. Hence building this from the taskId everytime.
+    String taskId = context.task.getTaskID().toString();
+    File cacheDirForJob = workDir.getParentFile().getParentFile();
+    if(context.task.isTaskCleanupTask()) {
+      taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
+    }
+    return new File(cacheDirForJob, taskId).getAbsolutePath(); 
+  }
+  
+  // Write the JVM command line to a file under the specified directory
+  // Note that the JVM will be launched using a setuid executable, and
+  // could potentially contain strings defined by a user. Hence, to
+  // prevent special character attacks, we write the command line to
+  // a file and execute it.
+  private void writeCommand(String cmdLine, 
+                                      String directory) throws IOException {
+    
+    PrintWriter pw = null;
+    String commandFile = directory + File.separator + COMMAND_FILE;
+    LOG.info("Writing commands to " + commandFile);
+    LOG.info("--------Commands Begin--------");
+    LOG.info(cmdLine);
+    LOG.info("--------Commands End--------");
+    try {
+      FileWriter fw = new FileWriter(commandFile);
+      BufferedWriter bw = new BufferedWriter(fw);
+      pw = new PrintWriter(bw);
+      pw.write(cmdLine);
+    } catch (IOException ioe) {
+      LOG.error("Caught IOException while writing JVM command line to file. "
+                + ioe.getMessage());
+    } finally {
+      if (pw != null) {
+        pw.close();
+      }
+      // set execute permissions for all on the file.
+      File f = new File(commandFile);
+      if (f.exists()) {
+        f.setReadable(true, false);
+        f.setExecutable(true, false);
+      }
+    }
+  }
+
+  private List<String> buildInitializeJobCommandArgs(
+      JobInitializationContext context) {
+    List<String> initJobCmdArgs = new ArrayList<String>();
+    initJobCmdArgs.add(context.jobid.toString());
+    return initJobCmdArgs;
+  }
+
+  @Override
+  void initializeJob(JobInitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize job " + context.jobid.toString()
+        + " on the TT");
+    runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
+        buildInitializeJobCommandArgs(context), context.workDir, null);
+  }
+
+  @Override
+  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to initialize distributed cache for " + context.user
+          + " with localizedBaseDir " + context.localizedBaseDir + 
+          " and uniqueString " + context.uniqueString);
+    }
+    List<String> args = new ArrayList<String>();
+    // Here, uniqueString might start with '-'. Adding -- in front of the 
+    // arguments indicates that they are non-option parameters.
+    args.add("--");
+    args.add(context.localizedBaseDir.toString());
+    args.add(context.uniqueString);
+    runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, 
+        context.user, args, context.workDir, null);
+  }
+
+  @Override
+  public void initializeUser(InitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize user directories for " + context.user
+        + " on the TT");
+    runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
+        new ArrayList<String>(), context.workDir, null);
+  }
+
+  /**
+   * API which builds the command line to be pass to LinuxTaskController
+   * binary to terminate/kill the task. See 
+   * {@code buildTaskControllerExecutor(TaskControllerCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * 
+   * 
+   * @param context context of task which has to be passed kill signal.
+   * 
+   */
+  private List<String> buildKillTaskCommandArgs(TaskControllerContext 
+      context){
+    List<String> killTaskJVMArgs = new ArrayList<String>();
+    killTaskJVMArgs.add(context.pid);
+    return killTaskJVMArgs;
+  }
+  
+  /**
+   * Convenience method used to sending appropriate signal to the task
+   * VM
+   * @param context
+   * @param command
+   * @throws IOException
+   */
+  protected void signalTask(TaskControllerContext context,
+      TaskControllerCommands command) throws IOException{
+    if(context.task == null) {
+      LOG.info("Context task is null; not signaling the JVM");
+      return;
+    }
+    ShellCommandExecutor shExec = buildTaskControllerExecutor(
+        command, context.env.conf.getUser(), 
+        buildKillTaskCommandArgs(context), context.env.workDir,
+        context.env.env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      throw new IOException(e);
+    }
+  }
+  
+  @Override
+  void terminateTask(TaskControllerContext context) {
+    try {
+      signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending kill to the Task VM " + 
+          StringUtils.stringifyException(e));
+    }
+  }
+  
+  @Override
+  void killTask(TaskControllerContext context) {
+    try {
+      signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending destroy to the Task VM " + 
+          StringUtils.stringifyException(e));
+    }
+  }
+
+  @Override
+  void dumpTaskStack(TaskControllerContext context) {
+    try {
+      signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
+          StringUtils.stringifyException(e));
+    }
+  }
+
+  protected String getTaskControllerExecutablePath() {
+    return taskControllerExe;
+  }
+
+  @Override
+  String getRunAsUser(JobConf conf) {
+    return conf.getUser();
+  }
+}
\ No newline at end of file

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+import junit.framework.TestCase;
+
+/**
+ * The base class which starts up a cluster with LinuxTaskController as the task
+ * controller.
+ * 
+ * In order to run test cases utilizing LinuxTaskController please follow the
+ * following steps:
+ * <ol>
+ * <li>Build LinuxTaskController by not passing any
+ * <code>-Dhadoop.conf.dir</code></li>
+ * <li>Change ownership of the built binary to root:group1, where group1 is
+ * a secondary group of the test runner.</li>
+ * <li>Change permissions on the binary so that <em>others</em> component does
+ * not have any permissions on binary</li> 
+ * <li>Make the built binary to setuid and setgid executable</li>
+ * <li>Execute following targets:
+ * <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em> 
+ * -Dtaskcontroller-ugi=<em>user,group</em></code>
+ *  <br/>(Note that "path to built binary" means the directory containing task-controller -
+ *  not the actual complete path of the binary itself. This path must end in ".../bin")
+ *  </li>
+ * </ol>
+ * 
+ */
+public class ClusterWithLinuxTaskController extends TestCase {
+  private static final Log LOG =
+      LogFactory.getLog(ClusterWithLinuxTaskController.class);
+
+  /**
+   * The wrapper class around LinuxTaskController which allows modification of
+   * the custom path to task-controller which we can use for task management.
+   * 
+   **/
+  public static class MyLinuxTaskController extends LinuxTaskController {
+    String taskControllerExePath = System.getProperty(TASKCONTROLLER_PATH)
+        + "/task-controller";
+    
+    @Override
+    public void setup() throws IOException {
+      getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup);
+
+      // write configuration file
+      configurationFile = createTaskControllerConf(System
+          .getProperty(TASKCONTROLLER_PATH), getConf());
+      super.setup();
+    }
+
+    @Override
+    protected String getTaskControllerExecutablePath() {
+      return new File(taskControllerExePath).getAbsolutePath();
+    }
+
+    void setTaskControllerExe(String execPath) {
+      this.taskControllerExePath = execPath;
+    }
+
+    volatile static int attemptedSigQuits = 0;
+    volatile static int failedSigQuits = 0;
+
+    /** Work like LinuxTaskController, but also count the number of
+      * attempted and failed SIGQUIT sends via the task-controller
+      * executable.
+      */
+    @Override
+    void dumpTaskStack(TaskControllerContext context) {
+      attemptedSigQuits++;
+      try {
+        signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
+      } catch (Exception e) {
+        LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
+        failedSigQuits++;
+      }
+    }
+  }
+
+  // cluster instances which sub classes can use
+  protected MiniMRCluster mrCluster = null;
+  protected MiniDFSCluster dfsCluster = null;
+
+  private JobConf clusterConf = null;
+  protected Path homeDirectory;
+
+  /** changing this to a larger number needs more work for creating 
+   *  taskcontroller.cfg.
+   *  see {@link #startCluster()} and
+   *  {@link #createTaskControllerConf(String, Configuration)}
+   */ 
+  private static final int NUMBER_OF_NODES = 1;
+
+  static final String TASKCONTROLLER_PATH = "taskcontroller-path";
+  static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
+
+  private static File configurationFile = null;
+
+  protected UserGroupInformation jobOwner;
+  
+  protected static String taskTrackerSpecialGroup = null;
+  /**
+   * Primary group of the tasktracker - i.e. the user running the
+   * test.
+   */
+  protected static String taskTrackerPrimaryGroup = null;
+  static {
+    if (isTaskExecPathPassed()) {
+      try {
+        taskTrackerSpecialGroup = FileSystem.getLocal(new Configuration())
+            .getFileStatus(
+                new Path(System.getProperty(TASKCONTROLLER_PATH),
+                    "task-controller")).getGroup();
+      } catch (IOException e) {
+        LOG.warn("Could not get group of the binary", e);
+        fail("Could not get group of the binary");
+      }
+      try {
+        taskTrackerPrimaryGroup = 
+          UserGroupInformation.getCurrentUser().getGroupNames()[0];
+      } catch (IOException ioe) {
+        LOG.warn("Could not get primary group of the current user", ioe);
+        fail("Could not get primary group of the current user");
+      }
+    }
+  }
+
+  /*
+   * Utility method which subclasses use to start and configure the MR Cluster
+   * so they can directly submit a job.
+   */
+  protected void startCluster()
+      throws IOException, InterruptedException {
+    JobConf conf = new JobConf();
+    dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null);
+    conf.set(TTConfig.TT_TASK_CONTROLLER,
+        MyLinuxTaskController.class.getName());
+    conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
+    mrCluster =
+        new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
+            .toString(), 4, null, null, conf);
+
+    clusterConf = mrCluster.createJobConf();
+
+    String ugi = System.getProperty(TASKCONTROLLER_UGI);
+    String[] splits = ugi.split(",");
+    jobOwner = UserGroupInformation.createUserForTesting(splits[0], 
+        new String[]{splits[1]});
+    createHomeAndStagingDirectory(clusterConf);
+  }
+
+  private void createHomeAndStagingDirectory(JobConf conf)
+      throws IOException {
+    FileSystem fs = dfsCluster.getFileSystem();
+    String path = "/user/" + jobOwner.getUserName();
+    homeDirectory = new Path(path);
+    LOG.info("Creating Home directory : " + homeDirectory);
+    fs.mkdirs(homeDirectory);
+    changePermission(fs);
+    Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+    LOG.info("Creating Staging root directory : " + stagingArea);
+    fs.mkdirs(stagingArea);
+    fs.setPermission(stagingArea, new FsPermission((short)0777));
+  }
+
+  private void changePermission(FileSystem fs)
+      throws IOException {
+    fs.setOwner(homeDirectory, jobOwner.getUserName(),
+        jobOwner.getGroupNames()[0]);
+  }
+
+  static File getTaskControllerConfFile(String path) {
+    File confDirectory = new File(path, "../conf");
+    return new File(confDirectory, "taskcontroller.cfg");
+  }
+  
+  /**
+   * Create taskcontroller.cfg.
+   * 
+   * @param path Path to the taskcontroller binary.
+   * @param conf TaskTracker's configuration
+   * @return the created conf file
+   * @throws IOException
+   */
+  static File createTaskControllerConf(String path,
+      Configuration conf) throws IOException {
+    File confDirectory = new File(path, "../conf");
+    if (!confDirectory.exists()) {
+      confDirectory.mkdirs();
+    }
+    File configurationFile = new File(confDirectory, "taskcontroller.cfg");
+    PrintWriter writer =
+        new PrintWriter(new FileOutputStream(configurationFile));
+
+    writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", conf
+        .get(MRConfig.LOCAL_DIR)));
+
+    writer
+        .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
+    writer.println(String.format(TTConfig.TT_GROUP + "=%s",
+        conf.get(TTConfig.TT_GROUP)));
+
+    writer.flush();
+    writer.close();
+    return configurationFile;
+  }
+
+  /**
+   * Can we run the tests with LinuxTaskController?
+   * 
+   * @return boolean
+   */
+  protected static boolean shouldRun() {
+    if (!isTaskExecPathPassed() || !isUserPassed()) {
+      LOG.info("Not running test.");
+      return false;
+    }
+    return true;
+  }
+
+  static boolean isTaskExecPathPassed() {
+    String path = System.getProperty(TASKCONTROLLER_PATH);
+    if (path == null || path.isEmpty()
+        || path.equals("${" + TASKCONTROLLER_PATH + "}")) {
+      LOG.info("Invalid taskcontroller-path : " + path); 
+      return false;
+    }
+    return true;
+  }
+
+  private static boolean isUserPassed() {
+    String ugi = System.getProperty(TASKCONTROLLER_UGI);
+    if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
+        && !ugi.isEmpty()) {
+      if (ugi.indexOf(",") > 1) {
+        return true;
+      }
+      LOG.info("Invalid taskcontroller-ugi : " + ugi); 
+      return false;
+    }
+    LOG.info("Invalid taskcontroller-ugi : " + ugi);
+    return false;
+  }
+
+  protected JobConf getClusterConf() {
+    return new JobConf(clusterConf);
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+
+    if (configurationFile != null) {
+      configurationFile.delete();
+    }
+
+    super.tearDown();
+  }
+
+  /**
+   * Assert that the job is actually run by the specified user by verifying the
+   * permissions of the output part-files.
+   * 
+   * @param outDir
+   * @throws IOException
+   */
+  protected void assertOwnerShip(Path outDir)
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(clusterConf);
+    assertOwnerShip(outDir, fs);
+  }
+
+  /**
+   * Assert that the job is actually run by the specified user by verifying the
+   * permissions of the output part-files.
+   * 
+   * @param outDir
+   * @param fs
+   * @throws IOException
+   */
+  protected void assertOwnerShip(Path outDir, FileSystem fs)
+      throws IOException {
+    for (FileStatus status : fs.listStatus(outDir, 
+                                           new Utils.OutputFileUtils
+                                                    .OutputFilesFilter())) {
+      String owner = status.getOwner();
+      String group = status.getGroup();
+      LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
+          + "," + group);
+      assertTrue("Output part-file's owner is not correct. Expected : "
+          + jobOwner.getUserName() + " Found : " + owner, owner
+          .equals(jobOwner.getUserName()));
+      assertTrue("Output part-file's group is not correct. Expected : "
+          + jobOwner.getGroupNames()[0] + " Found : " + group, group
+          .equals(jobOwner.getGroupNames()[0]));
+    }
+  }
+  
+  /**
+   * Validates permissions of private distcache dir and its contents fully
+   */
+  public static void checkPermissionsOnPrivateDistCache(String[] localDirs,
+      String user, String taskTrackerUser, String groupOwner)
+      throws IOException {
+    // user-dir, jobcache and distcache will have
+    //     2770 permissions if jobOwner is same as tt_user
+    //     2570 permissions for any other user
+    String expectedDirPerms  = taskTrackerUser.equals(user)
+                               ? "drwxrws---"
+                               : "dr-xrws---";
+    String expectedFilePerms = taskTrackerUser.equals(user)
+                               ? "-rwxrwx---"
+                               : "-r-xrwx---";
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPrivateDistributedCacheDir(user));
+      if (distCacheDir.exists()) {
+        checkPermissionsOnDir(distCacheDir, user, groupOwner, expectedDirPerms,
+            expectedFilePerms);
+      }
+    }
+  }
+ 
+  /**
+   * Check that files expected to be localized in distributed cache for a user
+   * are present.
+   * @param localDirs List of mapred local directories.
+   * @param user User against which localization is happening
+   * @param expectedFileNames List of files expected to be localized
+   * @throws IOException
+   */
+  public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs,
+      String user, String[] expectedFileNames) throws IOException {
+    FileGatherer gatherer = new FileGatherer();
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPrivateDistributedCacheDir(user));
+      findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+    }
+    assertEquals("Files expected in private distributed cache were not found",
+        expectedFileNames.length, gatherer.getCount());
+  }
+
+  /**
+   * Validates permissions and ownership of public distcache dir and its 
+   * contents fully in all local dirs
+   */
+  public static void checkPermissionsOnPublicDistCache(FileSystem localFS,
+      String[] localDirs, String owner, String group) throws IOException {
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPublicDistributedCacheDir());
+
+      if (distCacheDir.exists()) {
+        checkPublicFilePermissions(localFS, distCacheDir, owner, group);
+      }
+    }
+  }
+
+  /**
+   * Checks that files expected to be localized in the public distributed
+   * cache are present
+   * @param localDirs List of mapred local directories
+   * @param expectedFileNames List of expected file names.
+   * @throws IOException
+   */
+  public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs,
+      String[] expectedFileNames) throws IOException {
+    FileGatherer gatherer = new FileGatherer();
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPublicDistributedCacheDir());
+      findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+    }
+    assertEquals("Files expected in public distributed cache were not found",
+        expectedFileNames.length, gatherer.getCount());
+  }
+  
+  /**
+   * Validates permissions and ownership on the public distributed cache files
+   */
+  private static void checkPublicFilePermissions(FileSystem localFS, File dir,
+      String owner, String group)
+      throws IOException {
+    Path dirPath = new Path(dir.getAbsolutePath());
+    TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS, 
+        new Path[] {dirPath});
+    TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS,
+        new Path[] {dirPath}, owner, group);
+    if (dir.isDirectory()) {
+      File[] files = dir.listFiles();
+      for (File file : files) {
+        checkPublicFilePermissions(localFS, file, owner, group);
+      }
+    }
+  }
+
+  /**
+   * Validates permissions of given dir and its contents fully(i.e. recursively)
+   */
+  private static void checkPermissionsOnDir(File dir, String user,
+      String groupOwner, String expectedDirPermissions,
+      String expectedFilePermissions) throws IOException {
+    TestTaskTrackerLocalization.checkFilePermissions(dir.toString(),
+        expectedDirPermissions, user, groupOwner);
+    File[] files = dir.listFiles();
+    for (File file : files) {
+      if (file.isDirectory()) {
+        checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions,
+            expectedFilePermissions);
+      } else {
+        TestTaskTrackerLocalization.checkFilePermissions(file.toString(),
+            expectedFilePermissions, user, groupOwner);
+      }
+    }
+  }
+
+  // Check which files among those expected are present in the rootDir
+  // Add those present to the FileGatherer.
+  private static void findExpectedFiles(String[] expectedFileNames,
+      File rootDir, FileGatherer gatherer) {
+    
+    File[] files = rootDir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File file : files) {
+      if (file.isDirectory()) {
+        findExpectedFiles(expectedFileNames, file, gatherer);
+      } else {
+        if (isFilePresent(expectedFileNames, file)) {
+          gatherer.addFileName(file.getName());
+        }
+      }
+    }
+    
+  }
+  
+  // Test if the passed file is present in the expected list of files.
+  private static boolean isFilePresent(String[] expectedFileNames, File file) {
+    boolean foundFileName = false;
+    for (String name : expectedFileNames) {
+      if (name.equals(file.getName())) {
+        foundFileName = true;
+        break;
+      }
+    }
+    return foundFileName;
+  }
+  
+  // Helper class to collect a list of file names across multiple
+  // method calls. Wrapper around a collection defined for clarity
+  private static class FileGatherer {
+    List<String> foundFileNames = new ArrayList<String>();
+    
+    void addFileName(String fileName) {
+      foundFileNames.add(fileName);
+    }
+    
+    int getCount() {
+      return foundFileNames.size();
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+public class TestDebugScriptWithLinuxTaskController extends
+    ClusterWithLinuxTaskController {
+
+  @Test
+  public void testDebugScriptExecutionAsDifferentUser() throws Exception {
+    if (!super.shouldRun()) {
+      return;
+    }
+    super.startCluster();
+    TestDebugScript.setupDebugScriptDirs();
+    final Path inDir = new Path("input");
+    final Path outDir = new Path("output");
+    JobConf conf = super.getClusterConf();
+    FileSystem fs = inDir.getFileSystem(conf);
+    fs.mkdirs(inDir);
+    Path p = new Path(inDir, "1.txt");
+    fs.createNewFile(p);
+    String splits[] = System
+          .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI).
+          split(",");
+    JobID jobId = UserGroupInformation.createUserForTesting(splits[0], 
+        new String[]{splits[1]}).doAs(new PrivilegedExceptionAction<JobID>() {
+          public JobID run() throws IOException{
+          return TestDebugScript.runFailingMapJob(
+              TestDebugScriptWithLinuxTaskController.this.getClusterConf(), 
+              inDir, outDir);
+          }
+        });
+    // construct the task id of first map task of failmap
+    TaskAttemptID taskId = new TaskAttemptID(
+        new TaskID(jobId,TaskType.MAP, 0), 0);
+    TestDebugScript.verifyDebugScriptOutput(taskId, splits[0],
+        taskTrackerSpecialGroup, "-rw-rw----");
+    TestDebugScript.cleanupDebugScriptDirs();
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Test a java-based mapred job with LinuxTaskController running the jobs as a
+ * user different from the user running the cluster. See
+ * {@link ClusterWithLinuxTaskController}
+ */
+public class TestJobExecutionAsDifferentUser extends
+    ClusterWithLinuxTaskController {
+
+  public void testJobExecution()
+      throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+
+    
+    jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        Path inDir = new Path("input");
+        Path outDir = new Path("output");
+
+        RunningJob job;
+        // Run a job with zero maps/reduces
+        job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+        job.waitForCompletion();
+        assertTrue("Job failed", job.isSuccessful());
+        assertOwnerShip(outDir);
+
+        // Run a job with 1 map and zero reduces
+        job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+        job.waitForCompletion();
+        assertTrue("Job failed", job.isSuccessful());
+        assertOwnerShip(outDir);
+
+        // Run a normal job with maps/reduces
+        job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+        job.waitForCompletion();
+        assertTrue("Job failed", job.isSuccessful());
+        assertOwnerShip(outDir);
+
+        // Run a job with jvm reuse
+        JobConf myConf = getClusterConf();
+        myConf.set(JobContext.JVM_NUMTASKS_TORUN, "-1");
+        String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+        assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args)); 
+        return null;
+      }
+    });
+
+  }
+  
+  public void testEnvironment() throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+
+        TestMiniMRChildTask childTask = new TestMiniMRChildTask();
+        Path inDir = new Path("input1");
+        Path outDir = new Path("output1");
+        try {
+          childTask.runTestTaskEnv(getClusterConf(), inDir, outDir, false);
+        } catch (IOException e) {
+          fail("IOException thrown while running enviroment test."
+              + e.getMessage());
+        } finally {
+          FileSystem outFs = outDir.getFileSystem(getClusterConf());
+          if (outFs.exists(outDir)) {
+            assertOwnerShip(outDir);
+            outFs.delete(outDir, true);
+          } else {
+            fail("Output directory does not exist" + outDir.toString());
+          }
+          return null;
+        }
+      }
+    });
+  }
+
+  /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
+   * if a task times out.
+   */
+  public void testTimeoutStackTrace() throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+
+    // Run a job that should timeout and trigger a SIGQUIT.
+    startCluster();
+    jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        JobConf conf = getClusterConf();
+        conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+        conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+        SleepJob sleepJob = new SleepJob();
+        sleepJob.setConf(conf);
+        Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
+        job.setMaxMapAttempts(1);
+        int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
+        job.waitForCompletion(true);
+        assertTrue("Did not detect a new SIGQUIT!",
+            prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
+        assertEquals("A SIGQUIT attempt failed!", 0,
+            MyLinuxTaskController.failedSigQuits);
+        return null;
+      }
+    });
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Test killing of child processes spawned by the jobs with LinuxTaskController
+ * running the jobs as a user different from the user running the cluster. 
+ * See {@link ClusterWithLinuxTaskController}
+ */
+
+public class TestKillSubProcessesWithLinuxTaskController extends 
+  ClusterWithLinuxTaskController {
+
+  public void testKillSubProcess() throws Exception{
+    if(!shouldRun()) {
+      return;
+    }
+    startCluster();
+    jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws Exception {
+        JobConf myConf = getClusterConf();
+        JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker();
+
+        TestKillSubProcesses.mr = mrCluster;
+        TestKillSubProcesses sbProc = new TestKillSubProcesses();
+        sbProc.runTests(myConf, jt);
+        return null;
+      }
+    });
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestLinuxTaskController extends TestCase {
+  private static int INVALID_TASKCONTROLLER_PERMISSIONS = 24;
+  private static File testDir = new File(System.getProperty("test.build.data",
+      "/tmp"), TestLinuxTaskController.class.getName());
+  private static String taskControllerPath = System
+      .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+
+  @Before
+  protected void setUp() throws Exception {
+    testDir.mkdirs();
+  }
+
+  @After
+  protected void tearDown() throws Exception {
+    FileUtil.fullyDelete(testDir);
+  }
+
+  public static class MyLinuxTaskController extends LinuxTaskController {
+    String taskControllerExePath = taskControllerPath + "/task-controller";
+
+    @Override
+    protected String getTaskControllerExecutablePath() {
+      return taskControllerExePath;
+    }
+  }
+
+  private void validateTaskControllerSetup(TaskController controller,
+      boolean shouldFail) throws IOException {
+    if (shouldFail) {
+      // task controller setup should fail validating permissions.
+      Throwable th = null;
+      try {
+        controller.setup();
+      } catch (IOException ie) {
+        th = ie;
+      }
+      assertNotNull("No exception during setup", th);
+      assertTrue("Exception message does not contain exit code"
+          + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
+          "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
+    } else {
+      controller.setup();
+    }
+
+  }
+
+  @Test
+  public void testTaskControllerGroup() throws Exception {
+    if (!ClusterWithLinuxTaskController.isTaskExecPathPassed()) {
+      return;
+    }
+    // cleanup configuration file.
+    ClusterWithLinuxTaskController
+        .getTaskControllerConfFile(taskControllerPath).delete();
+    Configuration conf = new Configuration();
+    // create local dirs and set in the conf.
+    File mapredLocal = new File(testDir, "mapred/local");
+    mapredLocal.mkdirs();
+    conf.set(MRConfig.LOCAL_DIR, mapredLocal.toString());
+
+    // setup task-controller without setting any group name
+    TaskController controller = new MyLinuxTaskController();
+    controller.setConf(conf);
+    validateTaskControllerSetup(controller, true);
+
+    // set an invalid group name for the task controller group
+    conf.set(TTConfig.TT_GROUP, "invalid");
+    // write the task-controller's conf file
+    ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath,
+        conf);
+    validateTaskControllerSetup(controller, true);
+
+    conf.set(TTConfig.TT_GROUP,
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    // write the task-controller's conf file
+    ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath,
+        conf);
+    validateTaskControllerSetup(controller, false);
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker when {@link LinuxTaskController} is used.
+ * 
+ */
+public class TestLocalizationWithLinuxTaskController extends
+    TestTaskTrackerLocalization {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
+
+  private File configFile;
+  private static String taskTrackerUserName;
+
+  @Override
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
+  }
+
+  @Override
+  protected void setUp()
+      throws Exception {
+
+    if (!canRun()) {
+      return;
+    }
+
+    super.setUp();
+
+    taskTrackerUserName = UserGroupInformation.getLoginUser()
+                          .getShortUserName();
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    super.tearDown();
+    if (configFile != null) {
+      configFile.delete();
+    }
+  }
+
+  protected TaskController createTaskController() {
+    return new MyLinuxTaskController();
+  }
+
+  protected UserGroupInformation getJobOwner() {
+    String ugi = System
+        .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    String[] splits = ugi.split(",");
+    return UserGroupInformation.createUserForTesting(splits[0],
+        new String[] { splits[1] });
+  }
+
+  /** @InheritDoc */
+  @Override
+  public void testTaskControllerSetup() {
+    // Do nothing.
+  }
+
+  @Override
+  protected void checkUserLocalization()
+      throws IOException {
+    // Check the directory structure and permissions
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
+          localDir.exists());
+
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      assertTrue("taskTracker sub-dir in the local-dir " + localDir
+          + "is not created!", taskTrackerSubDir.exists());
+
+      // user-dir, jobcache and distcache will have
+      //     2770 permissions if jobOwner is same as tt_user
+      //     2570 permissions for any other user
+      String expectedDirPerms = taskTrackerUserName.equals(task.getUser())
+                                ? "drwxrws---"
+                                : "dr-xrws---";
+
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+          + "is not created!", userDir.exists());
+
+      checkFilePermissions(userDir.getAbsolutePath(), expectedDirPerms, task
+          .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+          jobCache.exists());
+
+      checkFilePermissions(jobCache.getAbsolutePath(), expectedDirPerms, task
+          .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+      // Verify the distributed cache dir.
+      File distributedCacheDir =
+          new File(localDir, TaskTracker
+              .getPrivateDistributedCacheDir(task.getUser()));
+      assertTrue("distributed cache dir " + distributedCacheDir
+          + " doesn't exists!", distributedCacheDir.exists());
+      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+          expectedDirPerms, task.getUser(),
+          ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    }
+  }
+
+  @Override
+  protected void checkJobLocalization()
+      throws IOException {
+    // job-dir, jars-dir and subdirectories in them will have
+    //     2770 permissions if jobOwner is same as tt_user
+    //     2570 permissions for any other user
+    // Files under these dirs will have
+    //      770 permissions if jobOwner is same as tt_user
+    //      570 permissions for any other user
+    String expectedDirPerms = taskTrackerUserName.equals(task.getUser())
+                              ? "drwxrws---"
+                              : "dr-xrws---";
+    String expectedFilePerms = taskTrackerUserName.equals(task.getUser())
+                               ? "-rwxrwx---"
+                               : "-r-xrwx---";
+
+    for (String localDir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
+      File jobDir =
+          new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId
+              .toString()));
+      // check the private permissions on the job directory
+      checkFilePermissions(jobDir.getAbsolutePath(), expectedDirPerms, task
+          .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    }
+
+    // check the private permissions of various directories
+    List<Path> dirs = new ArrayList<Path>();
+    Path jarsDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(),
+            jobId.toString()), trackerFConf);
+    dirs.add(jarsDir);
+    dirs.add(new Path(jarsDir, "lib"));
+    for (Path dir : dirs) {
+      checkFilePermissions(dir.toUri().getPath(), expectedDirPerms,
+          task.getUser(),
+          ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    }
+
+    // job-work dir needs user writable permissions i.e. 2770 for any user
+    Path jobWorkDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(),
+            jobId.toString()), trackerFConf);
+    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task
+        .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+    // check the private permissions of various files
+    List<Path> files = new ArrayList<Path>();
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile(
+        task.getUser(), jobId.toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task
+        .getUser(), jobId.toString()), trackerFConf));
+    files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
+    files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
+    for (Path file : files) {
+      checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
+          .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    }
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    // check job-acls.xml file permissions
+    checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+        + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    
+    // validate the content of job ACLs file
+    validateJobACLsFileContent();
+  }
+
+  @Override
+  protected void checkTaskLocalization()
+      throws IOException {
+    // check the private permissions of various directories
+    List<Path> dirs = new ArrayList<Path>();
+    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
+        .getUser(), jobId.toString(), taskId.toString(),
+        task.isTaskCleanupTask()), trackerFConf));
+    dirs.add(attemptWorkDir);
+    dirs.add(new Path(attemptWorkDir, "tmp"));
+    dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
+    for (Path dir : dirs) {
+      checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
+          task.getUser(),
+          ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    }
+
+    // check the private permissions of various files
+    List<Path> files = new ArrayList<Path>();
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+        .getUser(), task.getJobID().toString(), task.getTaskID().toString(),
+        task.isTaskCleanupTask()), trackerFConf));
+    for (Path file : files) {
+      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task
+          .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1346206&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Jun  5 01:02:16 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Test the DistributedCacheManager when LinuxTaskController is used.
+ * 
+ */
+public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends
+    TestTrackerDistributedCacheManager {
+
+  private File configFile;
+
+  private static final Log LOG =
+      LogFactory
+          .getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class);
+
+  @Override
+  protected void setUp()
+      throws IOException, InterruptedException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    TEST_ROOT_DIR =
+        new File(System.getProperty("test.build.data", "/tmp"),
+            TestTrackerDistributedCacheManagerWithLinuxTaskController.class
+                .getSimpleName()).getAbsolutePath();
+
+    super.setUp();
+    
+    taskController = new MyLinuxTaskController();
+    String path =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    String execPath = path + "/task-controller";
+    ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
+    taskController.setConf(conf);
+    taskController.setup();
+  }
+
+  @Override
+  protected void refreshConf(Configuration conf) throws IOException {
+    super.refreshConf(conf);
+    String path =
+      System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+      ClusterWithLinuxTaskController.createTaskControllerConf(path, conf);
+   
+  }
+
+  @Override
+  protected void tearDown()
+      throws IOException {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    if (configFile != null) {
+      configFile.delete();
+    }
+    super.tearDown();
+  }
+
+  @Override
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
+  }
+
+  @Override
+  protected String getJobOwnerName() {
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    String userName = ugi.split(",")[0];
+    return userName;
+  }
+
+  @Override
+  protected void checkFilePermissions(Path[] localCacheFiles)
+      throws IOException {
+    String userName = getJobOwnerName();
+    String filePermissions = UserGroupInformation.getLoginUser()
+        .getShortUserName().equals(userName) ? "-rwxrwx---" : "-r-xrwx---";
+
+    for (Path p : localCacheFiles) {
+      // First make sure that the cache file has proper permissions.
+      TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
+          filePermissions, userName,
+          ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+      // Now. make sure that all the path components also have proper
+      // permissions.
+      checkPermissionOnPathComponents(p.toUri().getPath(), userName);
+    }
+
+  }
+
+  /**
+   * @param cachedFilePath
+   * @param userName
+   * @throws IOException
+   */
+  private void checkPermissionOnPathComponents(String cachedFilePath,
+      String userName)
+      throws IOException {
+    // The trailing distcache/file/... string
+    String trailingStringForFirstFile =
+        cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
+            + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
+            + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
+            "");
+    LOG.info("Trailing path for cacheFirstFile is : "
+        + trailingStringForFirstFile);
+    // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string.
+    String leadingStringForFirstFile =
+        cachedFilePath.substring(0, cachedFilePath
+            .lastIndexOf(trailingStringForFirstFile));
+    LOG.info("Leading path for cacheFirstFile is : "
+        + leadingStringForFirstFile);
+
+    String dirPermissions = UserGroupInformation.getLoginUser()
+        .getShortUserName().equals(userName) ? "drwxrws---" : "dr-xrws---";
+
+    // Now check path permissions, starting with cache file's parent dir.
+    File path = new File(cachedFilePath).getParentFile();
+    while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) {
+      TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(),
+          dirPermissions, userName, 
+          ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+      path = path.getParentFile();
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message