hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r803583 [1/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apa...
Date Wed, 12 Aug 2009 16:17:49 GMT
Author: yhemanth
Date: Wed Aug 12 16:17:47 2009
New Revision: 803583

URL: http://svn.apache.org/viewvc?rev=803583&view=rev
Log:
MAPREDUCE-842. Setup secure permissions for localized job files, intermediate outputs and log files on tasktrackers. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/mapreduce/trunk/src/c++/task-controller/tests/
    hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/build.xml
    hadoop/mapreduce/trunk/conf/taskcontroller.cfg
    hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in
    hadoop/mapreduce/trunk/src/c++/task-controller/configuration.c
    hadoop/mapreduce/trunk/src/c++/task-controller/configuration.h.in
    hadoop/mapreduce/trunk/src/c++/task-controller/configure.ac
    hadoop/mapreduce/trunk/src/c++/task-controller/main.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/contrib/build-contrib.xml
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Aug 12 16:17:47 2009
@@ -189,6 +189,10 @@
 
     MAPREDUCE-789. Oracle support for Sqoop. (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-842. Setup secure permissions for localized job files,
+    intermediate outputs and log files on tasktrackers.
+    (Vinod Kumar Vavilapalli via yhemanth)
+    
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/build.xml?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/build.xml (original)
+++ hadoop/mapreduce/trunk/build.xml Wed Aug 12 16:17:47 2009
@@ -572,8 +572,8 @@
         <sysproperty key="test.debug.data" value="${test.debug.data}"/>
         <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
         <sysproperty key="test.src.dir" value="${test.src.dir}"/>
-      	 <sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
-      	 <sysproperty key="taskcontroller-user" value="${taskcontroller-user}"/>
+      	<sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
+        <sysproperty key="taskcontroller-ugi" value="${taskcontroller-ugi}"/>
         <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
         <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
         <sysproperty key="java.library.path"
@@ -1639,10 +1639,8 @@
   <target name="init-task-controller-build">
     <mkdir dir="${build.c++.task-controller}" />
     <copy todir="${build.c++.task-controller}">
-      <fileset dir="${c++.task-controller.src}" includes="*.c">
-      </fileset>
-      <fileset dir="${c++.task-controller.src}" includes="*.h">
-      </fileset>
+      <fileset dir="${c++.task-controller.src}" includes="*.c"/>
+      <fileset dir="${c++.task-controller.src}" includes="*.h"/>
     </copy>
     <chmod file="${c++.task-controller.src}/configure" perm="ugo+x"/> 
     <condition property="task-controller.conf.dir.passed">
@@ -1682,5 +1680,20 @@
       <arg value="install" />
     </exec>
   </target>
-  <!-- end of task-controller target -->
+  <target name="test-task-controller" depends="task-controller">
+    <copy todir="${build.c++.task-controller}" verbose="true">
+      <fileset dir="${c++.task-controller.src}" includes="tests/"/>
+    </copy>
+    <exec executable="${make.cmd}" dir="${build.c++.task-controller}" 
+        searchpath="yes" failonerror="yes">
+      <arg value="clean" />
+      <arg value="test" />
+    </exec>
+    <exec executable="${build.c++.task-controller}/tests/test-task-controller"
+        dir="${build.c++.task-controller}/tests/"
+        failonerror="yes">
+    </exec>
+  </target>
+
+  <!-- end of task-controller targets -->
 </project>

Modified: hadoop/mapreduce/trunk/conf/taskcontroller.cfg
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/taskcontroller.cfg?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/taskcontroller.cfg (original)
+++ hadoop/mapreduce/trunk/conf/taskcontroller.cfg Wed Aug 12 16:17:47 2009
@@ -1,3 +1,2 @@
-mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated
-hadoop.pid.dir=#configured HADOOP_PID_DIR
-hadoop.indent.str=#configured HADOOP_IDENT_STR
+mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths.
+hadoop.log.dir=#configured value of hadoop.log.dir.
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/Makefile.in?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in Wed Aug 12 16:17:47 2009
@@ -21,6 +21,9 @@
 BINARY=task-controller
 installdir = @prefix@
 
+testdir = tests
+TESTOBJS=${testdir}/test-task-controller.o task-controller.o configuration.o
+TESTBINARY=${testdir}/test-task-controller
 
 all: $(OBJS)
 	$(CC) $(CFLAG) -o $(BINARY) $(OBJS)
@@ -34,9 +37,14 @@
 configuration.o: configuration.h configuration.c
 	$(CC) $(CFLAG) -o configuration.o -c configuration.c
 
+${testdir}/test-task-controller.o: task-controller.c task-controller.h
+	$(CC) $(CFLAG) -o ${testdir}/test-task-controller.o -c ${testdir}/test-task-controller.c
+
+test: $(TESTOBJS)
+	$(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS)
 
 clean:
-	rm -rf $(BINARY) $(OBJS)
+	rm -rf $(BINARY) $(OBJS) $(TESTOBJS)
 
 install: all
 	cp $(BINARY) $(installdir)

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/configuration.c?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/configuration.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/configuration.c Wed Aug 12 16:17:47 2009
@@ -72,8 +72,9 @@
 #endif
 
 #ifdef DEBUG
-  fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name);
+  fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name);
 #endif
+
   //allocate space for ten configuration items.
   config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
       * MAX_SIZE);
@@ -87,7 +88,7 @@
   while(!feof(conf_file)) {
     line = (char *) malloc(linesize);
     if(line == NULL) {
-      fprintf(LOGFILE,"malloc failed while reading configuration file.\n");
+      fprintf(LOGFILE, "malloc failed while reading configuration file.\n");
       goto cleanup;
     }
     size_read = getline(&line,&linesize,conf_file);
@@ -123,9 +124,11 @@
           "Failed allocating memory for single configuration item\n");
       goto cleanup;
     }
+
 #ifdef DEBUG
-    fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok);
+    fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok);
 #endif
+
     memset(config.confdetails[config.size], 0, sizeof(struct confentry));
     config.confdetails[config.size]->key = (char *) malloc(
             sizeof(char) * (strlen(equaltok)+1));
@@ -142,9 +145,11 @@
       free(config.confdetails[config.size]);
       continue;
     }
+
 #ifdef DEBUG
-    fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok);
+    fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok);
 #endif
+
     config.confdetails[config.size]->value = (char *) malloc(
             sizeof(char) * (strlen(equaltok)+1));
     strcpy((char *)config.confdetails[config.size]->value, equaltok);
@@ -184,8 +189,7 @@
  * array, next time onwards used the populated array.
  *
  */
-
-const char * get_value(char* key) {
+const char * get_value(const char* key) {
   int count;
   if (config.size == 0) {
     get_configs();
@@ -196,15 +200,19 @@
   }
   for (count = 0; count < config.size; count++) {
     if (strcmp(config.confdetails[count]->key, key) == 0) {
-      return config.confdetails[count]->value;
+      return strdup(config.confdetails[count]->value);
     }
   }
   return NULL;
 }
 
-const char ** get_values(char * key) {
+/**
+ * Function to return an array of values for a key.
+ * Value delimiter is assumed to be a comma.
+ */
+const char ** get_values(const char * key) {
   const char ** toPass = NULL;
-  const char * value = get_value(key);
+  const char *value = get_value(key);
   char *tempTok = NULL;
   char *tempstr = NULL;
   int size = 0;

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/configuration.h.in
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/configuration.h.in?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/configuration.h.in (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/configuration.h.in Wed Aug 12 16:17:47 2009
@@ -53,10 +53,10 @@
   extern char *hadoop_conf_dir;
 #endif
 //method exposed to get the configurations
-const char * get_value(char* key);
+const char * get_value(const char* key);
 //method to free allocated configuration
 void free_configurations();
 
 //function to return array of values pointing to the key. Values are
 //comma seperated strings.
-const char ** get_values(char* key);
+const char ** get_values(const char* key);

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/configure.ac?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/configure.ac (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/configure.ac Wed Aug 12 16:17:47 2009
@@ -38,7 +38,7 @@
 
 # Checks for header files.
 AC_HEADER_STDC
-AC_CHECK_HEADERS([stdlib.h string.h unistd.h])
+AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h])
 
 #check for HADOOP_CONF_DIR
 
@@ -50,12 +50,17 @@
 # Checks for typedefs, structures, and compiler characteristics.
 AC_C_CONST
 AC_TYPE_PID_T
+AC_TYPE_MODE_T
+AC_TYPE_SIZE_T
 
 # Checks for library functions.
 AC_FUNC_MALLOC
 AC_FUNC_REALLOC
-AC_CHECK_FUNCS([strerror])
+AC_FUNC_CHOWN
+AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup])
 
 AC_CONFIG_FILES([Makefile])
 AC_OUTPUT
 
+AC_HEADER_STDBOOL
+AC_PROG_MAKE_SET

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Wed Aug 12 16:17:47 2009
@@ -17,6 +17,32 @@
  */
 #include "task-controller.h"
 
+void open_log_file(const char *log_file) {
+  if (log_file == NULL) {
+    LOGFILE = stdout;
+  } else {
+    LOGFILE = fopen(log_file, "a");
+    if (LOGFILE == NULL) {
+      fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file);
+      LOGFILE = stdout;
+    }
+    if (LOGFILE != stdout) {
+      if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
+          | S_IRGRP | S_IWGRP) < 0) {
+        fprintf(stdout, "Unable to change permission of the log file %s \n",
+            log_file);
+        fclose(LOGFILE);
+        fprintf(stdout, "changing log file to stdout");
+        LOGFILE = stdout;
+      }
+    }
+  }
+}
+
+void display_usage(FILE *stream) {
+  fprintf(stream,
+      "Usage: task-controller [-l logfile] user command command-args\n");
+}
 
 int main(int argc, char **argv) {
   int command;
@@ -24,6 +50,7 @@
   const char * job_id = NULL;
   const char * task_id = NULL;
   const char * tt_root = NULL;
+  const char *log_dir = NULL;
   int exit_code = 0;
   const char * task_pid = NULL;
   const char* const short_options = "l:";
@@ -35,7 +62,7 @@
   //Minimum number of arguments required to run the task-controller
   //command-name user command tt-root
   if (argc < 3) {
-    display_usage(stderr);
+    display_usage(stdout);
     return INVALID_ARGUMENT_NUMBER;
   }
 
@@ -54,24 +81,9 @@
       break;
     }
   } while (next_option != -1);
-  if (log_file == NULL) {
-    LOGFILE = stderr;
-  } else {
-    LOGFILE = fopen(log_file, "a");
-    if (LOGFILE == NULL) {
-      fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file);
-      LOGFILE = stderr;
-    }
-    if (LOGFILE != stderr) {
-      if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
-          | S_IRGRP | S_IWGRP) < 0) {
-        fprintf(stderr, "Unable to change permission of the log file %s \n",
-            log_file);
-        fprintf(stderr, "changing log file to stderr");
-        LOGFILE = stderr;
-      }
-    }
-  }
+
+  open_log_file(log_file);
+
   //checks done for user name
   //checks done if the user is root or not.
   if (argv[optind] == NULL) {
@@ -88,11 +100,15 @@
   }
   optind = optind + 1;
   command = atoi(argv[optind++]);
-#ifdef DEBUG
+
   fprintf(LOGFILE, "main : command provided %d\n",command);
   fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
-#endif
+
   switch (command) {
+  case INITIALIZE_JOB:
+    job_id = argv[optind++];
+    exit_code = initialize_job(job_id, user_detail->pw_name);
+    break;
   case LAUNCH_TASK_JVM:
     tt_root = argv[optind++];
     job_id = argv[optind++];
@@ -100,6 +116,11 @@
     exit_code
         = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
     break;
+  case INITIALIZE_TASK:
+    job_id = argv[optind++];
+    task_id = argv[optind++];
+    exit_code = initialize_task(job_id, task_id, user_detail->pw_name);
+    break;
   case TERMINATE_TASK_JVM:
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM);

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Wed Aug 12 16:17:47 2009
@@ -71,104 +71,431 @@
   return 0;
 }
 
-// function to check if the passed tt_root is present in hadoop.tmp.dir
-int check_tt_root(const char *tt_root) {
-  char ** mapred_local_dir;
-  int found = -1;
+/**
+ * Checks the passed value for the variable config_key against the values in
+ * the configuration.
+ * Returns 0 if the passed value is found in the configuration,
+ *        -1 otherwise
+ */
+int check_variable_against_config(const char *config_key,
+    const char *passed_value) {
 
-  if (tt_root == NULL) {
+  if (config_key == NULL || passed_value == NULL) {
     return -1;
   }
 
-  mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY);
+  int found = -1;
+
+  const char **config_value = get_values(config_key);
 
-  if (mapred_local_dir == NULL) {
+  if (config_value == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", config_key);
     return -1;
   }
 
-  while(*mapred_local_dir != NULL) {
-    if(strcmp(*mapred_local_dir,tt_root) == 0) {
+  char *full_config_value = (char *)get_value(config_key);
+
+  char **config_val_ptr = (char **) config_value;
+  while (*config_val_ptr != NULL) {
+    if (strcmp(*config_val_ptr, passed_value) == 0) {
       found = 0;
       break;
     }
+    config_val_ptr++;
   }
-  free(mapred_local_dir);
+
+  if (found != 0) {
+    fprintf(
+        LOGFILE,
+        "Invalid value passed: \
+        Configured value of %s is %s. \
+        Passed value is %s.\n",
+        config_key, full_config_value, passed_value);
+  }
+  free(full_config_value);
+  free(config_value);
   return found;
 }
 
 /**
- * Function to check if the constructed path and absolute
- * path resolve to one and same.
+ * Utility function to concatenate argB to argA using the concat_pattern
+ */
+char *concatenate(const char *argA, const char *argB, char *concat_pattern,
+    char *return_path_name) {
+  if (argA == NULL || argB == NULL) {
+    fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+        return_path_name);
+    return NULL;
+  }
+
+  char *return_path = NULL;
+  int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+
+  return_path = (char *) malloc(sizeof(char) * (str_len + 1));
+  if (return_path == NULL) {
+    fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+    return NULL;
+  }
+  memset(return_path, '\0', str_len + 1);
+  snprintf(return_path, str_len, concat_pattern, argA, argB);
+  return return_path;
+}
+
+/**
+ * Get the job-directory path from tt_root and job-id
  */
+char *get_job_directory(const char * tt_root, const char *jobid) {
+  return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+}
 
-int check_path(char *path) {
+char *get_job_work_directory(const char *job_dir) {
+  return concatenate(job_dir, "", JOB_DIR_TO_JOB_WORK_PATTERN,
+      "job_work_dir_path");
+}
+/**
+ * Get the attempt directory for the given attempt_id
+ */
+char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
+  return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
+      "attempt_dir_path");
+}
+
+/*
+ * Get the path to the task launcher file which is created by the TT
+ */
+char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) {
+  return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN,
+      "task_script_path");
+}
+
+/**
+ * Get the log directory for the given attempt.
+ */
+char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
+  return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
+      "task_log_dir");
+}
+
+/**
+ * Function to check if the passed tt_root is present in mapred.local.dir
+ * the task-controller is configured with.
+ */
+int check_tt_root(const char *tt_root) {
+  return check_variable_against_config(TT_SYS_DIR_KEY, tt_root);
+}
+
+/**
+ * Function to check if the constructed path and absolute path of the task
+ * launcher file resolve to one and same. This is done so as to avoid
+ * security pitfalls because of relative path components in the file name.
+ */
+int check_task_launcher_path(char *path) {
   char * resolved_path = (char *) canonicalize_file_name(path);
-  if(resolved_path == NULL) {
+  if (resolved_path == NULL) {
+    fprintf(LOGFILE,
+        "Error resolving the task launcher file path: %s. Passed path: %s\n",
+        strerror(errno), path);
     return ERROR_RESOLVING_FILE_PATH;
   }
-  if(strcmp(resolved_path, path) !=0) {
+  if (strcmp(resolved_path, path) != 0) {
+    fprintf(LOGFILE,
+        "Relative path components in the file path: %s. Resolved path: %s\n",
+        path, resolved_path);
     free(resolved_path);
     return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
   }
   free(resolved_path);
   return 0;
 }
+
 /**
- * Function to check if a user actually owns the file.
+ * Function to change the owner/group of a given path.
  */
-int check_owner(uid_t uid, char *path) {
-  struct stat filestat;
-  if(stat(path, &filestat)!=0) {
-    return UNABLE_TO_STAT_FILE;
+static int change_owner(const char *path, uid_t uid, gid_t gid) {
+  int exit_code = chown(path, uid, gid);
+  if (exit_code != 0) {
+    fprintf(LOGFILE, "chown %d:%d for path %s failed: %s.\n", uid, gid, path,
+        strerror(errno));
   }
-  //check owner.
-  if(uid != filestat.st_uid){
-    return FILE_NOT_OWNED_BY_TASKTRACKER;
+  return exit_code;
+}
+
+/**
+ * Function to change the mode of a given path.
+ */
+static int change_mode(const char *path, mode_t mode) {
+  int exit_code = chmod(path, mode);
+  if (exit_code != 0) {
+    fprintf(LOGFILE, "chown %d of path %s failed: %s.\n", mode, path,
+        strerror(errno));
   }
-  return 0;
+  return exit_code;
 }
 
-/*
- * function to provide path to the task file which is created by the tt
- *
- *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern
+/**
+ * Function to secure the given path. It does the following recursively:
+ *    1) changes the owner/group of the paths to the passed owner/group
+ *    2) changes the file permission to the passed file_mode and directory
+ *       permission to the passed dir_mode
+ */
+static int secure_path(const char *path, uid_t uid, gid_t gid,
+    mode_t file_mode, mode_t dir_mode) {
+  FTS *tree = NULL; // the file hierarchy
+  FTSENT *entry = NULL; // a file in the hierarchy
+  char *paths[] = { (char *) path };
+  int process_path = 0;
+  int dir = 0;
+  int error_code = 0;
+  int done = 0;
+
+  // Get physical locations and don't resolve the symlinks.
+  // Don't change directory while walking the directory.
+  int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
+
+  tree = fts_open(paths, ftsoptions, NULL);
+  if (tree == NULL) {
+    fprintf(LOGFILE,
+        "Cannot open file traversal structure for the path %s:%s.\n", path,
+        strerror(errno));
+    return -1;
+  }
+
+  while (((entry = fts_read(tree)) != NULL) && !done) {
+    dir = 0;
+    switch (entry->fts_info) {
+    case FTS_D:
+      // A directory being visited in pre-order.
+      // We change ownership of directories in post-order.
+      // so ignore the pre-order visit.
+      process_path = 0;
+      break;
+    case FTS_DC:
+      // A directory that causes a cycle in the tree
+      // We don't expect cycles, ignore.
+      process_path = 0;
+      break;
+    case FTS_DNR:
+      // A directory which cannot be read
+      // Ignore and set error code.
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_DOT:
+      // "."  or ".."
+      process_path = 0;
+      break;
+    case FTS_F:
+      // A regular file
+      process_path = 1;
+      break;
+    case FTS_DP:
+      // A directory being visited in post-order
+      if (entry->fts_level == 0) {
+        // root directory. Done with traversing.
+        done = 1;
+      }
+      process_path = 1;
+      dir = 1;
+      break;
+    case FTS_SL:
+      // A symbolic link
+      process_path = 1;
+      break;
+    case FTS_SLNONE:
+      // A symbolic link with a nonexistent target
+      process_path = 1;
+      break;
+    case FTS_NS:
+      // A  file for which no stat(2) information was available
+      // Ignore and set error code
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_ERR:
+      // An error return. Ignore and set error code.
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_DEFAULT:
+      // File that doesn't belong to any of the above type. Ignore.
+      process_path = 0;
+      break;
+    default:
+      // None of the above. Ignore and set error code
+      process_path = 0;
+      error_code = -1;
+    }
+
+    if (error_code != 0) {
+      break;
+    }
+    if (!process_path) {
+      continue;
+    }
+
+    if (check_ownership(getuid(), getgid(), entry->fts_path) != 0) {
+      fprintf(LOGFILE,
+          "Invalid file path. %s not user/group owned by the tasktracker.\n",
+          entry->fts_path);
+      error_code = -1;
+    } else if (change_owner(entry->fts_path, uid, gid) != 0) {
+      fprintf(LOGFILE, "couldn't change the ownership of %s\n",
+          entry->fts_path);
+      error_code = -3;
+    } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
+      fprintf(LOGFILE, "couldn't change the permissions of %s\n",
+          entry->fts_path);
+      error_code = -3;
+    }
+  }
+  if (fts_close(tree) != 0) {
+    fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
+        strerror(errno));
+  }
+  return error_code;
+}
+
+/**
+ * Function to prepare the attempt directories for the task JVM.
+ * This is done by changing the ownership of the attempt directory recursively
+ * to the job owner. We do the following:
+ *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/
+ *     *  sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/
  */
-void get_task_file_path(const char * jobid, const char * taskid,
-    const char * tt_root, char **task_script_path) {
-  const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY);
-  *task_script_path = NULL;
-  int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen(
-      taskid)) + strlen(tt_root);
+int prepare_attempt_directories(const char *job_id, const char *attempt_id,
+    const char *user) {
+  if (job_id == NULL || attempt_id == NULL || user == NULL) {
+    fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
 
-  if (mapred_local_dir == NULL) {
-    return;
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+    return INVALID_USER_NAME;
   }
 
-  *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1));
-  if (*task_script_path == NULL) {
-    fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n");
-    free(mapred_local_dir);
-    return;
+  int tasktracker_gid = getgid();
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
   }
 
-  memset(*task_script_path,'\0',str_len+1);
-  snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root,
-      jobid, taskid);
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
 #ifdef DEBUG
-  fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path);
-  fflush(LOGFILE);
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
 #endif
-  free(mapred_local_dir);
+
+  char *job_dir;
+  char *attempt_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    job_dir = get_job_directory(*local_dir_ptr, job_id);
+    if (job_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get job directory for %s.\n", job_id);
+      failed = 1;
+      break;
+    }
+
+    // prepare attempt-dir in each of the mapred_local_dir
+    attempt_dir = get_attempt_directory(job_dir, attempt_id);
+    if (attempt_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get attempt directory for %s.\n", attempt_id);
+      failed = 1;
+      free(job_dir);
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(attempt_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE,
+            "attempt_dir %s doesn't exist. Not doing anything.\n", attempt_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the attempt_dir %s\n", attempt_dir);
+        failed = 1;
+        free(attempt_dir);
+        free(job_dir);
+        break;
+      }
+    } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid,
+        S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+      // No setgid on files and setgid on dirs, 770
+      fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
+      failed = 1;
+      free(attempt_dir);
+      free(job_dir);
+      break;
+    }
+
+    local_dir_ptr++;
+    free(attempt_dir);
+    free(job_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+
+  cleanup();
+  if (failed) {
+    return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+  }
+  return 0;
 }
 
-//end of private functions
-void display_usage(FILE *stream) {
-  fprintf(stream,
-      "Usage: task-controller [-l logfile] user command command-args\n");
+/**
+ * Function to prepare the task logs for the child. It gives the user
+ * ownership of the attempt's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ *     *  sudo chown user:mapred log-dir/userlogs/$attemptid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ */
+int prepare_task_logs(const char *log_dir, const char *task_id) {
+
+  char *task_log_dir = get_task_log_dir(log_dir, task_id);
+  if (task_log_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
+    return -1;
+  }
+
+  struct stat filestat;
+  if (stat(task_log_dir, &filestat) != 0) {
+    if (errno == ENOENT) {
+      // See TaskRunner.java to see that an absent log-dir doesn't fail the task.
+      // Task log dir for cleanup tasks will not have the name
+      // task-attempt-id.cleanup. Instead a log.index.cleanup is created in
+      // task-attempt log dir. We check if the directory exists and return if
+      // it doesn't. So the following will work for cleanup attempts too.
+#ifdef DEBUG
+      fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
+          task_log_dir);
+#endif
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+      return -1;
+    }
+  }
+
+  int tasktracker_gid = getgid();
+  if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
+      | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+    // setgid on dirs but not files, 770. As of now, there are no files though
+    fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+    return -1;
+  }
+  return 0;
 }
 
 //function used to populate and user_details structure.
-
 int get_user_details(const char *user) {
   if (user_detail == NULL) {
     user_detail = getpwnam(user);
@@ -181,30 +508,197 @@
 }
 
 /*
- *Function used to launch a task as the provided user.
- * First the function checks if the tt_root passed is found in
- * hadoop.temp.dir
- * Uses get_task_file_path to fetch the task script file path.
- * Does an execlp on the same in order to replace the current image with
- * task image.
+ * Function to check if a user/group actually owns the file.
+  */
+int check_ownership(uid_t uid, gid_t gid, char *path) {
+  struct stat filestat;
+  if (stat(path, &filestat) != 0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  // check user/group.
+  if (uid != filestat.st_uid || gid != filestat.st_gid) {
+    return FILE_NOT_OWNED_BY_TASKTRACKER;
+  }
+  return 0;
+}
+
+/**
+ * Function to prepare the job directories for the task JVM.
+ * We do the following:
+ *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid
+ *     *  sudo chmod 2570 -R taskTracker/jobcache/$jobid
+ *     *  sudo chmod 2770 taskTracker/jobcache/$jobid/work
+ */
+int initialize_job(const char *jobid, const char *user) {
+  if (jobid == NULL || user == NULL) {
+    fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+    return INVALID_USER_NAME;
+  }
+
+  gid_t tasktracker_gid = getgid(); // TaskTracker's group-id
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
+#endif
+
+  char *job_dir, *job_work_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    job_dir = get_job_directory(*local_dir_ptr, jobid);
+    if (job_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get job directory for %s.\n", jobid);
+      failed = 1;
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(job_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE, "job_dir %s doesn't exist. Not doing anything.\n",
+            job_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the job_dir %s\n", job_dir);
+        failed = 1;
+        free(job_dir);
+        break;
+      }
+    } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid,
+        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+        != 0) {
+      // No setgid on files and setgid on dirs, 570
+      fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir);
+      failed = 1;
+      free(job_dir);
+      break;
+    } else {
+      job_work_dir = get_job_work_directory(job_dir);
+      if (job_work_dir == NULL) {
+        fprintf(LOGFILE, "Couldn't get job-work directory for %s.\n", jobid);
+        failed = 1;
+        break;
+      }
+
+      // Set 2770 on the job-work directory
+      if (stat(job_work_dir, &filestat) != 0) {
+        if (errno == ENOENT) {
+#ifdef DEBUG
+          fprintf(LOGFILE,
+              "job_work_dir %s doesn't exist. Not doing anything.\n",
+              job_work_dir);
+#endif
+        } else {
+          // stat failed because of something else!
+          fprintf(LOGFILE, "Failed to stat the job_work_dir %s\n",
+              job_work_dir);
+          failed = 1;
+          free(job_work_dir);
+          free(job_dir);
+          break;
+        }
+      } else if (change_mode(job_work_dir, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+        fprintf(LOGFILE,
+            "couldn't change the permissions of job_work_dir %s\n",
+            job_work_dir);
+        failed = 1;
+        free(job_work_dir);
+        free(job_dir);
+        break;
+      }
+    }
+
+    local_dir_ptr++;
+    free(job_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+  cleanup();
+  if (failed) {
+    return INITIALIZE_JOB_FAILED;
+  }
+  return 0;
+}
+
+/**
+ * Function used to initialize task. Prepares attempt_dir, jars_dir and
+ * log_dir to be accessible by the child
  */
+int initialize_task(const char *jobid, const char *taskid, const char *user) {
+  int exit_code = 0;
+#ifdef DEBUG
+  fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid);
+  fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid);
+#endif
+
+  if (prepare_attempt_directories(jobid, taskid, user) != 0) {
+    fprintf(LOGFILE,
+        "Couldn't prepare the attempt directories for %s of user %s.\n",
+        taskid, user);
+    exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+    goto cleanup;
+  }
+
+  char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+  if (log_dir == NULL) {
+    fprintf(LOGFILE, "Log directory is not configured.\n");
+    exit_code = INVALID_TT_LOG_DIR;
+    goto cleanup;
+  }
+
+  if (prepare_task_logs(log_dir, taskid) != 0) {
+    fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
+        log_dir, taskid);
+    exit_code = PREPARE_TASK_LOGS_FAILED;
+  }
+
+  cleanup:
+  // free configurations
+  cleanup();
+  if (log_dir != NULL) {
+    free(log_dir);
+  }
+  return exit_code;
+}
 
+/*
+ * Function used to launch a task as the provided user. It does the following :
+ * 1) Checks if the tt_root passed is found in mapred.local.dir
+ * 2) Prepares attempt_dir and log_dir to be accessible by the child
+ * 3) Uses get_task_launcher_file to fetch the task script file path
+ * 4) Does an execlp on the same in order to replace the current image with
+ * task image.
+ */
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root) {
-  char *task_script_path = NULL;
   int exit_code = 0;
-  uid_t uid = getuid();
 
-  if(jobid == NULL || taskid == NULL) {
+  if (jobid == NULL || taskid == NULL || tt_root == NULL) {
     return INVALID_ARGUMENT_NUMBER;
   }
 
 #ifdef DEBUG
-  fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid);
-  fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid);
-  fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root);
-  fflush(LOGFILE);
+  fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
+  fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
+  fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
 #endif
+
   //Check tt_root before switching the user, as reading configuration
   //file requires privileged access.
   if (check_tt_root(tt_root) < 0) {
@@ -213,34 +707,48 @@
     return INVALID_TT_ROOT;
   }
 
-  //change the user
-  fclose(LOGFILE);
-  fcloseall();
-  umask(0);
-  if (change_user(user) != 0) {
-    cleanup();
-    return SETUID_OPER_FAILED;
+  char *job_dir = NULL, *task_script_path = NULL;
+
+  if ((exit_code = initialize_task(jobid, taskid, user)) != 0) {
+    fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid,
+        user);
+    goto cleanup;
   }
 
-  get_task_file_path(jobid, taskid, tt_root, &task_script_path);
+  job_dir = get_job_directory(tt_root, jobid);
+  if (job_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
+  }
+
+  task_script_path = get_task_launcher_file(job_dir, taskid);
   if (task_script_path == NULL) {
-    cleanup();
-    return INVALID_TASK_SCRIPT_PATH;
+    fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir);
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
   }
+
   errno = 0;
-  exit_code = check_path(task_script_path);
+  exit_code = check_task_launcher_path(task_script_path);
   if(exit_code != 0) {
     goto cleanup;
   }
-  errno = 0;
-  exit_code = check_owner(uid, task_script_path);
-  if(exit_code != 0) {
+
+  //change the user
+  fcloseall();
+  free(job_dir);
+  umask(0007);
+  if (change_user(user) != 0) {
+    exit_code = SETUID_OPER_FAILED;
     goto cleanup;
   }
+
   errno = 0;
   cleanup();
   execlp(task_script_path, task_script_path, NULL);
   if (errno != 0) {
+    fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
     free(task_script_path);
     exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
   }
@@ -248,6 +756,9 @@
   return exit_code;
 
 cleanup:
+  if (job_dir != NULL) {
+    free(job_dir);
+  }
   if (task_script_path != NULL) {
     free(task_script_path);
   }
@@ -261,19 +772,25 @@
  * The function sends appropriate signal to the process group
  * specified by the task_pid.
  */
-
 int kill_user_task(const char *user, const char *task_pid, int sig) {
   int pid = 0;
 
   if(task_pid == NULL) {
     return INVALID_ARGUMENT_NUMBER;
   }
+
+#ifdef DEBUG
+  fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user);
+  fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid);
+  fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig);
+#endif
+
   pid = atoi(task_pid);
 
   if(pid <= 0) {
     return INVALID_TASK_PID;
   }
-  fclose(LOGFILE);
+
   fcloseall();
   if (change_user(user) != 0) {
     cleanup();
@@ -283,6 +800,7 @@
   //Don't continue if the process-group is not alive anymore.
   if(kill(-pid,0) < 0) {
     errno = 0;
+    cleanup();
     return 0;
   }
 
@@ -297,4 +815,3 @@
   cleanup();
   return 0;
 }
-

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Wed Aug 12 16:17:47 2009
@@ -28,14 +28,20 @@
 #include <sys/stat.h>
 #include <sys/signal.h>
 #include <getopt.h>
-#include<grp.h>
+#include <grp.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+
 #include "configuration.h"
 
 //command definitions
 enum command {
+  INITIALIZE_JOB,
   LAUNCH_TASK_JVM,
+  INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
-  KILL_TASK_JVM
+  KILL_TASK_JVM,
 };
 
 enum errorcodes {
@@ -45,23 +51,33 @@
   SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
   INVALID_TT_ROOT, //5
   SETUID_OPER_FAILED, //6
-  INVALID_TASK_SCRIPT_PATH, //7
-  UNABLE_TO_EXECUTE_TASK_SCRIPT, //8
-  UNABLE_TO_KILL_TASK, //9
-  INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10
-  INVALID_TASK_PID, //11
-  ERROR_RESOLVING_FILE_PATH, //12
-  RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
-  UNABLE_TO_STAT_FILE, //14
-  FILE_NOT_OWNED_BY_TASKTRACKER //15
+  UNABLE_TO_EXECUTE_TASK_SCRIPT, //7
+  UNABLE_TO_KILL_TASK, //8
+  INVALID_TASK_PID, //9
+  ERROR_RESOLVING_FILE_PATH, //10
+  RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //11
+  UNABLE_TO_STAT_FILE, //12
+  FILE_NOT_OWNED_BY_TASKTRACKER, //13
+  PREPARE_ATTEMPT_DIRECTORIES_FAILED, //14
+  INITIALIZE_JOB_FAILED, //15
+  PREPARE_TASK_LOGS_FAILED, //16
+  INVALID_TT_LOG_DIR, //17
+  OUT_OF_MEMORY, //18
 };
 
+#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+
+#define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
+
+#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
 
-#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
+#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 
 #define TT_SYS_DIR_KEY "mapred.local.dir"
 
-#define MAX_ITEMS 10
+#define TT_LOG_DIR_KEY "hadoop.log.dir"
 
 #ifndef HADOOP_CONF_DIR
   #define EXEC_PATTERN "/bin/task-controller"
@@ -72,10 +88,22 @@
 
 extern FILE *LOGFILE;
 
-void display_usage(FILE *stream);
+int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root);
+
+int initialize_task(const char *jobid, const char *taskid, const char *user);
 
-int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root);
+int initialize_job(const char *jobid, const char *user);
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
+int prepare_attempt_directory(const char *attempt_dir, const char *user);
+
+// The following functions are exposed for testing
+
+int check_variable_against_config(const char *config_key,
+    const char *passed_value);
+
 int get_user_details(const char *user);
+
+char *get_task_launcher_file(const char *job_dir, const char *attempt_dir);

Added: hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=803583&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c (added)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c Wed Aug 12 16:17:47 2009
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../task-controller.h"
+
+#define HADOOP_CONF_DIR "/tmp"
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  char const *str =
+      "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fwrite(str, 1, strlen(str), file);
+  fclose(file);
+  return 0;
+}
+
+void test_check_variable_against_config() {
+
+  // A temporary configuration directory
+  char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
+
+  // To accomodate "/conf/taskcontroller.cfg"
+  char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
+
+  strcpy(template, conf_dir_templ);
+  char *temp_dir = mkdtemp(template);
+  if (temp_dir == NULL) {
+    printf("Couldn't create a temporary dir for conf.\n");
+    goto cleanup;
+  }
+
+  // Set the configuration directory
+  hadoop_conf_dir = strdup(temp_dir);
+
+  // create the configuration directory
+  strcat(template, "/conf");
+  char *conf_dir = strdup(template);
+  mkdir(conf_dir, S_IRWXU);
+
+  // create the configuration file
+  strcat(template, "/taskcontroller.cfg");
+  if (write_config_file(template) != 0) {
+    printf("Couldn't write the configuration file.\n");
+    goto cleanup;
+  }
+
+  // Test obtaining a value for a key from the config
+  char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
+      "/tmp/testing3", "/tmp/testing4" };
+  char *value = (char *) get_value("mapred.local.dir");
+  if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
+      != 0) {
+    printf("Obtaining a value for a key from the config failed.\n");
+    goto cleanup;
+  }
+
+  // Test the parsing of a multiple valued key from the config
+  char **values = (char **)get_values("mapred.local.dir");
+  char **values_ptr = values;
+  int i = 0;
+  while (*values_ptr != NULL) {
+    printf(" value : %s\n", *values_ptr);
+    if (strcmp(*values_ptr, config_values[i++]) != 0) {
+      printf("Configured values are not read out properly. Test failed!");
+      goto cleanup;;
+    }
+    values_ptr++;
+  }
+
+  if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) {
+    printf("Configuration should not contain /tmp/testing5! \n");
+    goto cleanup;
+  }
+
+  if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) {
+    printf("Configuration should contain /tmp/testing4! \n");
+    goto cleanup;
+  }
+
+  cleanup: if (value != NULL) {
+    free(value);
+  }
+  if (values != NULL) {
+    free(values);
+  }
+  if (hadoop_conf_dir != NULL) {
+    free(hadoop_conf_dir);
+  }
+  unlink(template);
+  rmdir(conf_dir);
+  rmdir(hadoop_conf_dir);
+}
+
+void test_get_job_directory() {
+  char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+  printf("job_dir obtained is %s\n", job_dir);
+  int ret = 0;
+  if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+    ret = -1;
+  }
+  free(job_dir);
+  assert(ret == 0);
+}
+
+void test_get_attempt_directory() {
+  char *attempt_dir = (char *) get_attempt_directory(
+      "/tmp/taskTracker/jobcache/job_200906101234_0001",
+      "attempt_200906112028_0001_m_000000_0");
+  printf("attempt_dir obtained is %s\n", attempt_dir);
+  int ret = 0;
+  if (strcmp(
+      attempt_dir,
+      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      != 0) {
+    ret = -1;
+  }
+  free(attempt_dir);
+  assert(ret == 0);
+}
+
+void test_get_task_launcher_file() {
+  char *task_file = (char *) get_task_launcher_file(
+      "/tmp/taskTracker/jobcache/job_200906101234_0001",
+      "attempt_200906112028_0001_m_000000_0");
+  printf("task_file obtained is %s\n", task_file);
+  int ret = 0;
+  if (strcmp(
+      task_file,
+      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+      != 0) {
+    ret = -1;
+  }
+  free(task_file);
+  assert(ret == 0);
+}
+
+void test_get_task_log_dir() {
+  char *logdir = (char *) get_task_log_dir("/tmp/testing",
+      "attempt_200906112028_0001_m_000000_0");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir,
+      "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
+int main(int argc, char **argv) {
+  printf("Starting tests\n");
+  LOGFILE = stdout;
+  test_check_variable_against_config();
+  test_get_job_directory();
+  test_get_attempt_directory();
+  test_get_task_launcher_file();
+  test_get_task_log_dir();
+  printf("Finished tests\n");
+  return 0;
+}

Modified: hadoop/mapreduce/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/build-contrib.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/build-contrib.xml Wed Aug 12 16:17:47 2009
@@ -238,7 +238,7 @@
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
       <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/> 
       <sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
-      <sysproperty key="taskcontroller-user" value="${taskcontroller-user}"/>
+      <sysproperty key="taskcontroller-ugi" value="${taskcontroller-ugi}"/>
       <classpath refid="test.classpath"/>
       <formatter type="${test.junit.output.format}" />
       <batchtest todir="${build.test}" unless="testcase">

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java Wed Aug 12 16:17:47 2009
@@ -65,7 +65,7 @@
             "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") };
     StreamJob streamJob = new StreamJob(args, true);
     streamJob.setConf(myConf);
-    streamJob.go();
+    assertTrue("Job has not succeeded", streamJob.go() == 0);
     assertOwnerShip(outputPath);
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Wed Aug 12 16:17:47 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.StringUtils;
 
 import junit.framework.TestCase;
 
@@ -89,10 +90,14 @@
       assertFalse("output not cleaned up", fs.exists(outputPath));
       mr.waitUntilIdle();
     } catch(IOException e) {
-      fail(e.toString());
+      fail(StringUtils.stringifyException(e));
     } finally {
-      mr.shutdown();
-      dfs.shutdown();
+      if (mr != null) {
+        mr.shutdown();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Wed Aug 12 16:17:47 2009
@@ -649,11 +649,13 @@
             distribution. The task tracker uses this executable to 
             launch and kill tasks. The setuid executable switches to
             the user who has submitted the job and launches or kills
-            the tasks. Currently, this task controller 
-            opens up permissions to local files and directories used 
-            by the tasks such as the job jar files, distributed archive 
-            files, intermediate files and task log files. In future,
-            it is expected that stricter file permissions are used.
+            the tasks. For maximum security, this task controller 
+            sets up restricted permissions and user/group ownership of
+            local files and directories used by the tasks such as the
+            job jar files, intermediate files and task log files. Currently
+            permissions on distributed cache files are opened up to be
+            accessible by all users. In future, it is expected that stricter
+            file permissions are set for these files too.
             </td>
             </tr>
             </table>
@@ -695,15 +697,32 @@
             </p>
             
             <p>
-            The executable must be deployed as a setuid executable, by changing
-            the ownership to <em>root</em> and giving it permissions <em>4755</em>. 
+            The executable must have specific permissions as follows. The
+            executable should have <em>6050 or --Sr-s---</em> permissions
+            user-owned by root(super-user) and group-owned by a group 
+            of which only the TaskTracker's user is the sole group member. 
+            For example, let's say that the TaskTracker is run as user
+            <em>mapred</em> who is part of the groups <em>users</em> and
+            <em>mapredGroup</em> any of them being the primary group.
+            Let also be that <em>users</em> has both <em>mapred</em> and
+            another user <em>X</em> as its members, while <em>mapredGroup</em>
+            has only <em>mapred</em> as its member. Going by the above
+            description, the setuid/setgid executable should be set
+            <em>6050 or --Sr-s---</em> with user-owner as <em>mapred</em> and
+            group-owner as <em>mapredGroup</em> which has
+            only <em>mapred</em> as its member(and not <em>users</em> which has
+            <em>X</em> also as its member besides <em>mapred</em>).
             </p>
             
             <p>The executable requires a configuration file called 
             <em>taskcontroller.cfg</em> to be
             present in the configuration directory passed to the ant target 
             mentioned above. If the binary was not built with a specific 
-            conf directory, the path defaults to <em>/path-to-binary/../conf</em>.
+            conf directory, the path defaults to
+            <em>/path-to-binary/../conf</em>. The configuration file must be
+            owned by the user running TaskTracker (user <em>mapred</em> in the
+            above example), group-owned by anyone and should have the
+            permissions <em>0400 or r--------</em>.
             </p>
             
             <p>The executable requires following configuration items to be 
@@ -718,13 +737,20 @@
             validate paths passed to the setuid executable in order to prevent
             arbitrary paths being passed to it.</td>
             </tr>
+            <tr>
+            <td>hadoop.log.dir</td>
+            <td>Path to hadoop log directory. Should be same as the value which
+            the TaskTracker is started with. This is required to set proper
+            permissions on the log files so that they can be written to by the user's
+            tasks and read by the TaskTracker for serving on the web UI.</td>
+            </tr>
             </table>
 
             <p>
-            The LinuxTaskController requires that paths leading up to
+            The LinuxTaskController requires that paths including and leading up to
             the directories specified in
-            <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be 755
-            and directories themselves having 777 permissions.
+            <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be set 755
+            permissions.
             </p>
             </section>
             

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java Wed Aug 12 16:17:47 2009
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
@@ -548,10 +547,9 @@
     boolean isActive() { return isActive; }
 
     private Writer<K,V> createSpillFile() throws IOException {
-      Path tmp = new Path(
-          TaskTracker.getIntermediateOutputDir(
-              tid.getJobID().toString(), tid.toString()) + 
-              "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out");
+      Path tmp =
+          new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_"
+              + (spillNumber++) + ".out");
 
       LOG.info("Created file: " + tmp);
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Wed Aug 12 16:17:47 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.log4j.LogManager;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 
 /** 
  * The main() for child processes. 
@@ -138,6 +139,10 @@
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
 
+        // setup the child's mapred-local-dir. The child is now sandboxed and
+        // can only see files down and under attemtdir only.
+        TaskRunner.setupChildMapredLocalDirs(task, job);
+
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
@@ -149,8 +154,6 @@
 
         task.setConf(job);
 
-        defaultConf.addResource(new Path(task.getJobFile()));
-
         // Initiate Java VM metrics
         JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
         // use job-specified working directory
@@ -168,14 +171,15 @@
       LOG.fatal("FSError from child", e);
       umbilical.fsError(taskid, e.getMessage());
     } catch (Throwable throwable) {
-      LOG.warn("Error running child", throwable);
+      LOG.warn("Error running child : "
+          + StringUtils.stringifyException(throwable));
       try {
         if (task != null) {
           // do cleanup for the task
           task.taskCleanup(umbilical);
         }
       } catch (Throwable th) {
-        LOG.info("Error cleaning up" + th);
+        LOG.info("Error cleaning up : " + StringUtils.stringifyException(th));
       }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Wed Aug 12 16:17:47 2009
@@ -48,6 +48,8 @@
    */
   void launchTaskJVM(TaskController.TaskControllerContext context) 
                                       throws IOException {
+    initializeTask(context);
+
     JvmEnv env = context.env;
     List<String> wrappedCommand = 
       TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
@@ -72,20 +74,13 @@
     // So this is a dummy method.
     return;
   }
-  
-
-  @Override
-  void setup() {
-    // nothing to setup
-    return;
-  }
 
   /*
    * No need to do anything as we don't need to do as we dont need anything
    * extra from what TaskTracker has done.
    */
   @Override
-  void initializeJob(JobID jobId) {
+  void initializeJob(JobInitializationContext context) {
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Aug 12 16:17:47 2009
@@ -31,13 +31,8 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
  * IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -169,17 +164,24 @@
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
     File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
     local.setWorkingDirectory(new Path(workDirName.toString()));
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
     // set up a classloader with the right classpath
-    ClassLoader classLoader = makeClassLoader(conf, workDirName);
+    ClassLoader classLoader =
+        makeClassLoader(conf, new File(workDirName.toString()));
     Thread.currentThread().setContextClassLoader(classLoader);
     conf.setClassLoader(classLoader);
-    
-    Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), 
-                               "split.dta");
+
+    // split.dta file is used only by IsolationRunner. The file can now be in
+    // any of the configured local disks, so use LocalDirAllocator to find out
+    // where it is.
+    Path localSplit =
+        new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+            TaskTracker.getLocalSplitFile(taskId.getJobID().toString(), taskId
+                .toString()), conf);
     DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
     String splitClass = Text.readString(splitFile);
     BytesWritable split = new BytesWritable();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Aug 12 16:17:47 2009
@@ -1418,7 +1418,7 @@
    * @return The localized job specific shared directory
    */
   public String getJobLocalDir() {
-    return get("job.local.dir");
+    return get(TaskTracker.JOB_LOCAL_DIR);
   }
 
   public long getMemoryForMapTask() {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java Wed Aug 12 16:17:47 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.StringUtils;
 
 class JvmManager {
 
@@ -111,7 +112,8 @@
     }
   }
 
-  public TaskInProgress getTaskForJvm(JVMId jvmId) {
+  public TaskInProgress getTaskForJvm(JVMId jvmId)
+      throws IOException {
     if (jvmId.isMapJVM()) {
       return mapJvmManager.getTaskForJvm(jvmId);
     } else {
@@ -177,7 +179,8 @@
       jvmIdToRunner.get(jvmId).setBusy(true);
     }
     
-    synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
+    synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
+        throws IOException {
       if (jvmToRunningTask.containsKey(jvmId)) {
         //Incase of JVM reuse, tasks are returned to previously launched
         //JVM via this method. However when a new task is launched
@@ -185,16 +188,26 @@
         TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
         JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
         Task task = taskRunner.getTaskInProgress().getTask();
-        TaskControllerContext context = 
-          new TaskController.TaskControllerContext();
+
+        // Initialize task dirs
+        TaskControllerContext context =
+            new TaskController.TaskControllerContext();
         context.env = jvmRunner.env;
         context.task = task;
-        //If we are returning the same task as which the JVM was launched
-        //we don't initialize task once again.
-        if(!jvmRunner.env.conf.get("mapred.task.id").
-            equals(task.getTaskID().toString())) {
-          tracker.getTaskController().initializeTask(context);
+        // If we are returning the same task as which the JVM was launched
+        // we don't initialize task once again.
+        if (!jvmRunner.env.conf.get("mapred.task.id").equals(
+            task.getTaskID().toString())) {
+          try {
+            tracker.getTaskController().initializeTask(context);
+          } catch (IOException e) {
+            LOG.warn("Failed to initialize the new task "
+                + task.getTaskID().toString() + " to be given to JVM with id "
+                + jvmId);
+            throw e;
+          }
         }
+
         return taskRunner.getTaskInProgress();
       }
       return null;
@@ -393,7 +406,6 @@
           //Launch the task controller to run task JVM
           initalContext.task = jvmToRunningTask.get(jvmId).getTask();
           initalContext.env = env;
-          tracker.getTaskController().initializeTask(initalContext);
           tracker.getTaskController().launchTaskJVM(initalContext);
         } catch (IOException ioe) {
           // do nothing
@@ -403,13 +415,13 @@
           if (shexec == null) {
             return;
           }
-          
+
           kill();
-          
+
           int exitCode = shexec.getExitCode();
           updateOnJvmExit(jvmId, exitCode);
-          LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
-              numTasksRan);
+          LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
+              + ". Number of tasks it ran: " + numTasksRan);
           try {
             // In case of jvm-reuse,
             //the task jvm cleans up the common workdir for every 
@@ -438,6 +450,7 @@
                 .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
                     ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
+            // Destroy the task jvm
             controller.destroyTaskJVM(initalContext);
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Wed Aug 12 16:17:47 2009
@@ -24,12 +24,11 @@
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -73,52 +72,27 @@
         new File(hadoopBin, "task-controller").getAbsolutePath();
   }
   
-  // The list of directory paths specified in the
-  // variable mapred.local.dir. This is used to determine
-  // which among the list of directories is picked up
-  // for storing data for a particular task.
-  private String[] mapredLocalDirs;
-  
-  // permissions to set on files and directories created.
-  // When localized files are handled securely, this string
-  // will change to something more restrictive. Until then,
-  // it opens up the permissions for all, so that the tasktracker
-  // and job owners can access files together.
-  private static final String FILE_PERMISSIONS = "ugo+rwx";
-  
-  // permissions to set on components of the path leading to
-  // localized files and directories. Read and execute permissions
-  // are required for different users to be able to access the
-  // files.
-  private static final String PATH_PERMISSIONS = "go+rx";
-  
   public LinuxTaskController() {
     super();
   }
   
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    mapredLocalDirs = conf.getStrings("mapred.local.dir");
-    //Setting of the permissions of the local directory is done in 
-    //setup()
-  }
-  
   /**
    * List of commands that the setuid script will execute.
    */
   enum TaskCommands {
+    INITIALIZE_JOB,
     LAUNCH_TASK_JVM,
+    INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
-    KILL_TASK_JVM
+    KILL_TASK_JVM,
   }
-  
+
   /**
    * Launch a task JVM that will run as the owner of the job.
    * 
-   * This method launches a task JVM by executing a setuid
-   * executable that will switch to the user and run the
-   * task.
+   * This method launches a task JVM by executing a setuid executable that will
+   * switch to the user and run the task. Also does initialization of the first
+   * task in the same setuid process launch.
    */
   @Override
   void launchTaskJVM(TaskController.TaskControllerContext context) 
@@ -150,48 +124,103 @@
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
-                                    launchTaskJVMArgs, env);
+                                    launchTaskJVMArgs, env.workDir, env.env);
     context.shExec = shExec;
     try {
       shExec.execute();
     } catch (Exception e) {
-      LOG.warn("Exception thrown while launching task JVM : " + 
-          StringUtils.stringifyException(e));
-      LOG.warn("Exit code from task is : " + shExec.getExitCode());
-      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+      // terminated/killed forcefully. In all other cases, log the
+      // task-controller output
+      if (exitCode != 143 && exitCode != 137) {
+        LOG.warn("Exception thrown while launching task JVM : "
+            + StringUtils.stringifyException(e));
+        LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+        logOutput(shExec.getOutput());
+      }
       throw new IOException(e);
     }
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("output after executing task jvm = " + shExec.getOutput()); 
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+      logOutput(shExec.getOutput());
     }
   }
 
   /**
-   * Returns list of arguments to be passed while launching task VM.
-   * See {@code buildTaskControllerExecutor(TaskCommands, 
-   * String, List<String>, JvmEnv)} documentation.
+   * Helper method that runs a LinuxTaskController command
+   * 
+   * @param taskCommand
+   * @param user
+   * @param cmdArgs
+   * @param env
+   * @throws IOException
+   */
+  private void runCommand(TaskCommands taskCommand, String user,
+      List<String> cmdArgs, File workDir, Map<String, String> env)
+      throws IOException {
+
+    ShellCommandExecutor shExec =
+        buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+          + shExec.getExitCode());
+      LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+          + StringUtils.stringifyException(e));
+      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+          + " follows:");
+      logOutput(shExec.getOutput());
+      throw new IOException(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+          + " follows:");
+      logOutput(shExec.getOutput());
+    }
+  }
+
+  /**
+   * Returns list of arguments to be passed while initializing a new task. See
+   * {@code buildTaskControllerExecutor(TaskCommands, String, List<String>,
+   * JvmEnv)} documentation.
+   * 
    * @param context
    * @return Argument to be used while launching Task VM
    */
-  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+  private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     String taskId = context.task.getTaskID().toString();
     String jobId = getJobId(context);
-    LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context));
-    commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
-        context));
     commandArgs.add(jobId);
-    if(!context.task.isTaskCleanupTask()) {
+    if (!context.task.isTaskCleanupTask()) {
       commandArgs.add(taskId);
-    }else {
+    } else {
       commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
     }
     return commandArgs;
   }
-  
-  // get the Job ID from the information in the TaskControllerContext
+
+  @Override
+  void initializeTask(TaskControllerContext context)
+      throws IOException {
+    LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+        + " for " + context.task.getTaskID().toString());
+    runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+        buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+  }
+
+  private void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
+      }
+    }
+  }
+
   private String getJobId(TaskControllerContext context) {
     String taskId = context.task.getTaskID().toString();
     TaskAttemptID tId = TaskAttemptID.forName(taskId);
@@ -199,6 +228,27 @@
     return jobId;
   }
 
+  /**
+   * Returns list of arguments to be passed while launching task VM.
+   * See {@code buildTaskControllerExecutor(TaskCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * @param context
+   * @return Argument to be used while launching Task VM
+   */
+  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    LOG.debug("getting the task directory as: " 
+        + getTaskCacheDirectory(context));
+    LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context)), 
+        context) );
+    commandArgs.add(getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context)), 
+        context));
+    commandArgs.addAll(buildInitializeTaskArgs(context));
+    return commandArgs;
+  }
+
   // Get the directory from the list of directories configured
   // in mapred.local.dir chosen for storing data pertaining to
   // this task.
@@ -208,8 +258,8 @@
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
       File mapredDir = new File(dir);
-      File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
-          jobId, taskId, context.task.isTaskCleanupTask()));
+      File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
+          jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
       if (directory.equals(taskDir)) {
         return dir;
       }
@@ -219,68 +269,7 @@
     throw new IllegalArgumentException("invalid task cache directory "
         + directory.getAbsolutePath());
   }
-  
-  /**
-   * Setup appropriate permissions for directories and files that
-   * are used by the task.
-   * 
-   * As the LinuxTaskController launches tasks as a user, different
-   * from the daemon, all directories and files that are potentially 
-   * used by the tasks are setup with appropriate permissions that
-   * will allow access.
-   * 
-   * Until secure data handling is implemented (see HADOOP-4491 and
-   * HADOOP-4493, for e.g.), the permissions are set up to allow
-   * read, write and execute access for everyone. This will be 
-   * changed to restricted access as data is handled securely.
-   */
-  void initializeTask(TaskControllerContext context) {
-    // Setup permissions for the job and task cache directories.
-    setupTaskCacheFileAccess(context);
-    // setup permissions for task log directory
-    setupTaskLogFileAccess(context);    
-  }
-  
-  // Allows access for the task to create log files under 
-  // the task log directory
-  private void setupTaskLogFileAccess(TaskControllerContext context) {
-    TaskAttemptID taskId = context.task.getTaskID();
-    File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
-    String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
-    changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
-  }
 
-  // Allows access for the task to read, write and execute 
-  // the files under the job and task cache directories
-  private void setupTaskCacheFileAccess(TaskControllerContext context) {
-    String taskId = context.task.getTaskID().toString();
-    JobID jobId = JobID.forName(getJobId(context));
-    //Change permission for the task across all the disks
-    for(String localDir : mapredLocalDirs) {
-      File f = new File(localDir);
-      File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
-          jobId.toString(), taskId, context.task.isTaskCleanupTask()));
-      if(taskCacheDir.exists()) {
-        changeDirectoryPermissions(taskCacheDir.getPath(), 
-            FILE_PERMISSIONS, true);
-      }          
-    }//end of local directory Iteration 
-  }
-
-  // convenience method to execute chmod.
-  private void changeDirectoryPermissions(String dir, String mode, 
-                                              boolean isRecursive) {
-    int ret = 0;
-    try {
-      ret = FileUtil.chmod(dir, mode, isRecursive);
-    } catch (Exception e) {
-      LOG.warn("Exception in changing permissions for directory " + dir + 
-                  ". Exception: " + e.getMessage());
-    }
-    if (ret != 0) {
-      LOG.warn("Could not change permissions for directory " + dir);
-    }
-  }
   /**
    * Builds the command line for launching/terminating/killing task JVM.
    * Following is the format for launching/terminating/killing task JVM
@@ -295,14 +284,15 @@
    * @param command command to be executed.
    * @param userName user name
    * @param cmdArgs list of extra arguments
+   * @param workDir working directory for the task-controller
    * @param env JVM environment variables.
    * @return {@link ShellCommandExecutor}
    * @throws IOException
    */
-  private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
-                                          String userName, 
-                                          List<String> cmdArgs, JvmEnv env) 
-                                    throws IOException {
+  private ShellCommandExecutor buildTaskControllerExecutor(
+      TaskCommands command, String userName, List<String> cmdArgs,
+      File workDir, Map<String, String> env)
+      throws IOException {
     String[] taskControllerCmd = new String[3 + cmdArgs.size()];
     taskControllerCmd[0] = getTaskControllerExecutablePath();
     taskControllerCmd[1] = userName;
@@ -317,9 +307,9 @@
       }
     }
     ShellCommandExecutor shExec = null;
-    if(env.workDir != null && env.workDir.exists()) {
+    if(workDir != null && workDir.exists()) {
       shExec = new ShellCommandExecutor(taskControllerCmd,
-          env.workDir, env.env);
+          workDir, env);
     } else {
       shExec = new ShellCommandExecutor(taskControllerCmd);
     }
@@ -371,68 +361,21 @@
       }
     }
   }
-  
 
-  /**
-   * Sets up the permissions of the following directories:
-   * 
-   * Job cache directory
-   * Archive directory
-   * Hadoop log directories
-   * 
-   */
-  @Override
-  void setup() {
-    //set up job cache directory and associated permissions
-    String localDirs[] = this.mapredLocalDirs;
-    for(String localDir : localDirs) {
-      //Cache root
-      File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
-      File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
-      if(!cacheDirectory.exists()) {
-        if(!cacheDirectory.mkdirs()) {
-          LOG.warn("Unable to create cache directory : " + 
-              cacheDirectory.getPath());
-        }
-      }
-      if(!jobCacheDirectory.exists()) {
-        if(!jobCacheDirectory.mkdirs()) {
-          LOG.warn("Unable to create job cache directory : " + 
-              jobCacheDirectory.getPath());
-        }
-      }
-      //Give world writable permission for every directory under
-      //mapred-local-dir.
-      //Child tries to write files under it when executing.
-      changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
-    }//end of local directory manipulations
-    //setting up perms for user logs
-    File taskLog = TaskLog.getUserLogDir();
-    changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+  private List<String> buildInitializeJobCommandArgs(
+      JobInitializationContext context) {
+    List<String> initJobCmdArgs = new ArrayList<String>();
+    initJobCmdArgs.add(context.jobid.toString());
+    return initJobCmdArgs;
   }
 
-  /*
-   * Create Job directories across disks and set their permissions to 777
-   * This way when tasks are run we just need to setup permissions for
-   * task folder.
-   */
   @Override
-  void initializeJob(JobID jobid) {
-    for(String localDir : this.mapredLocalDirs) {
-      File jobDirectory = new File(localDir, 
-          TaskTracker.getLocalJobDir(jobid.toString()));
-      if(!jobDirectory.exists()) {
-        if(!jobDirectory.mkdir()) {
-          LOG.warn("Unable to create job cache directory : " 
-              + jobDirectory.getPath());
-          continue;
-        }
-      }
-      //Should be recursive because the jar and work folders might be 
-      //present under the job cache directory
-      changeDirectoryPermissions(
-          jobDirectory.getPath(), FILE_PERMISSIONS, true);
-    }
+  void initializeJob(JobInitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize job " + context.jobid.toString()
+        + " on the TT");
+    runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+        buildInitializeJobCommandArgs(context), context.workDir, null);
   }
 
   /**
@@ -467,7 +410,7 @@
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env);
+        buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
     try {
       shExec.execute();
     } catch (Exception e) {
@@ -498,6 +441,5 @@
 
   protected String getTaskControllerExecutablePath() {
     return taskControllerExe;
-  }  
+  }
 }
-



Mime
View raw message