hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077111 [1/3] - in /hadoop/common/branches/branch-0.20-security-patches: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/c...
Date Fri, 04 Mar 2011 03:42:02 GMT
Author: omalley
Date: Fri Mar  4 03:42:01 2011
New Revision: 1077111

URL: http://svn.apache.org/viewvc?rev=1077111&view=rev
Log:
commit 53521eb587b76d342a75f81f5b1095826e01c946
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date:   Tue Jan 19 12:11:07 2010 +0530

    MAPREDUCE:842 from https://issues.apache.org/jira/secure/attachment/12430697/842.20S-4.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-842. Setup secure permissions for localized job files,
    +    intermediate outputs and log files on tasktrackers.
    +    (Vinod Kumar Vavilapalli via yhemanth)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/build.xml
    hadoop/common/branches/branch-0.20-security-patches/conf/taskcontroller.cfg
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/Makefile.in
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configure.ac
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: hadoop/common/branches/branch-0.20-security-patches/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/build.xml?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/build.xml Fri Mar  4 03:42:01 2011
@@ -740,8 +740,8 @@
       <sysproperty key="test.debug.data" value="${test.debug.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
-    	<sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
-    	<sysproperty key="taskcontroller-user" value="${taskcontroller-user}"/>
+      <sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
+      <sysproperty key="taskcontroller-ugi" value="${taskcontroller-ugi}"/>
       <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
       <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
       <sysproperty key="java.library.path"
@@ -1820,10 +1820,8 @@
   <target name="init-task-controller-build">
     <mkdir dir="${build.c++.task-controller}" />
     <copy todir="${build.c++.task-controller}">
-      <fileset dir="${c++.task-controller.src}" includes="*.c">
-      </fileset>
-      <fileset dir="${c++.task-controller.src}" includes="*.h">
-      </fileset>
+      <fileset dir="${c++.task-controller.src}" includes="*.c"/>
+      <fileset dir="${c++.task-controller.src}" includes="*.h"/>
     </copy>
     <chmod file="${c++.task-controller.src}/configure" perm="ugo+x"/> 
     <condition property="task-controller.conf.dir.passed">
@@ -1863,5 +1861,20 @@
       <arg value="install" />
     </exec>
   </target>
-  <!-- end of task-controller target -->
+  <target name="test-task-controller" depends="task-controller">
+    <copy todir="${build.c++.task-controller}" verbose="true">
+      <fileset dir="${c++.task-controller.src}" includes="tests/"/>
+    </copy>
+    <exec executable="${make.cmd}" dir="${build.c++.task-controller}" 
+        searchpath="yes" failonerror="yes">
+      <arg value="clean" />
+      <arg value="test" />
+    </exec>
+    <exec executable="${build.c++.task-controller}/tests/test-task-controller"
+        dir="${build.c++.task-controller}/tests/"
+        failonerror="yes">
+    </exec>
+  </target>
+
+  <!-- end of task-controller targets -->
 </project>

Modified: hadoop/common/branches/branch-0.20-security-patches/conf/taskcontroller.cfg
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/taskcontroller.cfg?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/taskcontroller.cfg (original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/taskcontroller.cfg Fri Mar  4 03:42:01 2011
@@ -1,4 +1,3 @@
-mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated
-hadoop.pid.dir=#configured HADOOP_PID_DIR
-hadoop.indent.str=#configured HADOOP_IDENT_STR
+mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths.
+hadoop.log.dir=#configured value of hadoop.log.dir.
 mapred.tasktracker.tasks.sleeptime-before-sigkill=#sleep time before sig kill is to be sent to process group after sigterm is sent. Should be in seconds

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/Makefile.in
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/Makefile.in?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/Makefile.in (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/Makefile.in Fri Mar  4 03:42:01 2011
@@ -21,6 +21,9 @@ CFLAGS = @CFLAGS@
 BINARY=task-controller
 installdir = @prefix@
 
+testdir = tests
+TESTOBJS=${testdir}/test-task-controller.o task-controller.o configuration.o
+TESTBINARY=${testdir}/test-task-controller
 
 all: $(OBJS)
 	$(CC) $(CFLAG) -o $(BINARY) $(OBJS)
@@ -34,9 +37,14 @@ task-controller.o: task-controller.c tas
 configuration.o: configuration.h configuration.c
 	$(CC) $(CFLAG) -o configuration.o -c configuration.c
 
+${testdir}/test-task-controller.o: task-controller.c task-controller.h
+	$(CC) $(CFLAG) -o ${testdir}/test-task-controller.o -c ${testdir}/test-task-controller.c
+
+test: $(TESTOBJS)
+	$(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS)
 
 clean:
-	rm -rf $(BINARY) $(OBJS)
+	rm -rf $(BINARY) $(OBJS) $(TESTOBJS)
 
 install: all
 	cp $(BINARY) $(installdir)

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/configuration.c?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c Fri Mar  4 03:42:01 2011
@@ -72,8 +72,9 @@ void get_configs() {
 #endif
 
 #ifdef DEBUG
-  fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name);
+  fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name);
 #endif
+
   //allocate space for ten configuration items.
   config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
       * MAX_SIZE);
@@ -87,7 +88,7 @@ void get_configs() {
   while(!feof(conf_file)) {
     line = (char *) malloc(linesize);
     if(line == NULL) {
-      fprintf(LOGFILE,"malloc failed while reading configuration file.\n");
+      fprintf(LOGFILE, "malloc failed while reading configuration file.\n");
       goto cleanup;
     }
     size_read = getline(&line,&linesize,conf_file);
@@ -123,9 +124,11 @@ void get_configs() {
           "Failed allocating memory for single configuration item\n");
       goto cleanup;
     }
+
 #ifdef DEBUG
-    fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok);
+    fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok);
 #endif
+
     memset(config.confdetails[config.size], 0, sizeof(struct confentry));
     config.confdetails[config.size]->key = (char *) malloc(
             sizeof(char) * (strlen(equaltok)+1));
@@ -142,9 +145,11 @@ void get_configs() {
       free(config.confdetails[config.size]);
       continue;
     }
+
 #ifdef DEBUG
-    fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok);
+    fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok);
 #endif
+
     config.confdetails[config.size]->value = (char *) malloc(
             sizeof(char) * (strlen(equaltok)+1));
     strcpy((char *)config.confdetails[config.size]->value, equaltok);
@@ -184,8 +189,7 @@ void get_configs() {
  * array, next time onwards used the populated array.
  *
  */
-
-const char * get_value(char* key) {
+const char * get_value(const char* key) {
   int count;
   if (config.size == 0) {
     get_configs();
@@ -196,22 +200,27 @@ const char * get_value(char* key) {
   }
   for (count = 0; count < config.size; count++) {
     if (strcmp(config.confdetails[count]->key, key) == 0) {
-      return config.confdetails[count]->value;
+      return strdup(config.confdetails[count]->value);
     }
   }
   return NULL;
 }
 
-const char ** get_values(char * key) {
+/**
+ * Function to return an array of values for a key.
+ * Value delimiter is assumed to be a comma.
+ */
+const char ** get_values(const char * key) {
+  const char ** toPass = NULL;
+  const char *value = get_value(key);
   char *tempTok = NULL;
   char *tempstr = NULL;
   int size = 0;
   int toPassSize = MAX_SIZE;
-  const char** toPass = (const char **) malloc(sizeof(char *) * toPassSize);
 
   //first allocate any array of 10
-  const char * value = get_value(key);
   if(value != NULL) {
+    toPass = (const char **) malloc(sizeof(char *) * toPassSize);
     tempTok = strtok_r((char *)value, ",", &tempstr);
     while (tempTok != NULL) {
       toPass[size++] = tempTok;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/configuration.h.in?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in Fri Mar  4 03:42:01 2011
@@ -53,10 +53,10 @@ extern struct configuration config;
   extern char *hadoop_conf_dir;
 #endif
 //method exposed to get the configurations
-const char * get_value(char* key);
+const char * get_value(const char* key);
 //method to free allocated configuration
 void free_configurations();
 
 //function to return array of values pointing to the key. Values are
 //comma seperated strings.
-const char ** get_values(char* key);
+const char ** get_values(const char* key);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/configure.ac?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configure.ac (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configure.ac Fri Mar  4 03:42:01 2011
@@ -38,7 +38,7 @@ AC_PROG_CC
 
 # Checks for header files.
 AC_HEADER_STDC
-AC_CHECK_HEADERS([stdlib.h string.h unistd.h])
+AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h])
 
 #check for HADOOP_CONF_DIR
 
@@ -50,12 +50,17 @@ fi
 # Checks for typedefs, structures, and compiler characteristics.
 AC_C_CONST
 AC_TYPE_PID_T
+AC_TYPE_MODE_T
+AC_TYPE_SIZE_T
 
 # Checks for library functions.
 AC_FUNC_MALLOC
 AC_FUNC_REALLOC
-AC_CHECK_FUNCS([strerror])
+AC_FUNC_CHOWN
+AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup])
 
 AC_CONFIG_FILES([Makefile])
 AC_OUTPUT
 
+AC_HEADER_STDBOOL
+AC_PROG_MAKE_SET

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar  4 03:42:01 2011
@@ -17,6 +17,32 @@
  */
 #include "task-controller.h"
 
+void open_log_file(const char *log_file) {
+  if (log_file == NULL) {
+    LOGFILE = stdout;
+  } else {
+    LOGFILE = fopen(log_file, "a");
+    if (LOGFILE == NULL) {
+      fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file);
+      LOGFILE = stdout;
+    }
+    if (LOGFILE != stdout) {
+      if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
+          | S_IRGRP | S_IWGRP) < 0) {
+        fprintf(stdout, "Unable to change permission of the log file %s \n",
+            log_file);
+        fclose(LOGFILE);
+        fprintf(stdout, "changing log file to stdout");
+        LOGFILE = stdout;
+      }
+    }
+  }
+}
+
+void display_usage(FILE *stream) {
+  fprintf(stream,
+      "Usage: task-controller [-l logfile] user command command-args\n");
+}
 
 int main(int argc, char **argv) {
   int command;
@@ -24,6 +50,7 @@ int main(int argc, char **argv) {
   const char * job_id = NULL;
   const char * task_id = NULL;
   const char * tt_root = NULL;
+  const char *log_dir = NULL;
   int exit_code = 0;
   const char * task_pid = NULL;
   const char* const short_options = "l:";
@@ -35,7 +62,7 @@ int main(int argc, char **argv) {
   //Minimum number of arguments required to run the task-controller
   //command-name user command tt-root
   if (argc < 3) {
-    display_usage(stderr);
+    display_usage(stdout);
     return INVALID_ARGUMENT_NUMBER;
   }
 
@@ -54,24 +81,9 @@ int main(int argc, char **argv) {
       break;
     }
   } while (next_option != -1);
-  if (log_file == NULL) {
-    LOGFILE = stderr;
-  } else {
-    LOGFILE = fopen(log_file, "a");
-    if (LOGFILE == NULL) {
-      fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file);
-      LOGFILE = stderr;
-    }
-    if (LOGFILE != stderr) {
-      if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
-          | S_IRGRP | S_IWGRP) < 0) {
-        fprintf(stderr, "Unable to change permission of the log file %s \n",
-            log_file);
-        fprintf(stderr, "changing log file to stderr");
-        LOGFILE = stderr;
-      }
-    }
-  }
+
+  open_log_file(log_file);
+
   //checks done for user name
   //checks done if the user is root or not.
   if (argv[optind] == NULL) {
@@ -88,11 +100,15 @@ int main(int argc, char **argv) {
   }
   optind = optind + 1;
   command = atoi(argv[optind++]);
-#ifdef DEBUG
+
   fprintf(LOGFILE, "main : command provided %d\n",command);
   fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
-#endif
+
   switch (command) {
+  case INITIALIZE_JOB:
+    job_id = argv[optind++];
+    exit_code = initialize_job(job_id, user_detail->pw_name);
+    break;
   case LAUNCH_TASK_JVM:
     tt_root = argv[optind++];
     job_id = argv[optind++];
@@ -100,6 +116,11 @@ int main(int argc, char **argv) {
     exit_code
         = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
     break;
+  case INITIALIZE_TASK:
+    job_id = argv[optind++];
+    task_id = argv[optind++];
+    exit_code = initialize_task(job_id, task_id, user_detail->pw_name);
+    break;
   case TERMINATE_TASK_JVM:
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 03:42:01 2011
@@ -71,103 +71,428 @@ int change_user(const char * user) {
   return 0;
 }
 
-// function to check if the passed tt_root is present in hadoop.tmp.dir
-int check_tt_root(const char *tt_root) {
-  char ** mapred_local_dir;
-  char ** iter;
-  int found = -1;
+/**
+ * Checks the passed value for the variable config_key against the values in
+ * the configuration.
+ * Returns 0 if the passed value is found in the configuration,
+ *        -1 otherwise
+ */
+int check_variable_against_config(const char *config_key,
+    const char *passed_value) {
 
-  if (tt_root == NULL) {
+  if (config_key == NULL || passed_value == NULL) {
     return -1;
   }
 
-  mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY);
-  iter = mapred_local_dir;
+  int found = -1;
+
+  const char **config_value = get_values(config_key);
 
-  if (mapred_local_dir == NULL) {
+  if (config_value == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", config_key);
     return -1;
   }
 
-  while(*iter != NULL) {
-    if(strcmp(*iter, tt_root) == 0) {
+  char *full_config_value = (char *)get_value(config_key);
+
+  char **config_val_ptr = (char **) config_value;
+  while (*config_val_ptr != NULL) {
+    if (strcmp(*config_val_ptr, passed_value) == 0) {
       found = 0;
       break;
     }
-    ++iter; 
+    config_val_ptr++;
   }
-  free(mapred_local_dir);
+
+  if (found != 0) {
+    fprintf(
+        LOGFILE,
+        "Invalid value passed: \
+        Configured value of %s is %s. \
+        Passed value is %s.\n",
+        config_key, full_config_value, passed_value);
+  }
+  free(full_config_value);
+  free(config_value);
   return found;
 }
 
 /**
- * Function to check if the constructed path and absolute
- * path resolve to one and same.
+ * Utility function to concatenate argB to argA using the concat_pattern
+ */
+char *concatenate(const char *argA, const char *argB, char *concat_pattern,
+    char *return_path_name) {
+  if (argA == NULL || argB == NULL) {
+    fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+        return_path_name);
+    return NULL;
+  }
+
+  char *return_path = NULL;
+  int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+
+  return_path = (char *) malloc(sizeof(char) * (str_len + 1));
+  if (return_path == NULL) {
+    fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+    return NULL;
+  }
+  memset(return_path, '\0', str_len + 1);
+  snprintf(return_path, str_len, concat_pattern, argA, argB);
+  return return_path;
+}
+
+/**
+ * Get the job-directory path from tt_root and job-id
  */
+char *get_job_directory(const char * tt_root, const char *jobid) {
+  return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+}
 
-int check_path(char *path) {
+char *get_job_work_directory(const char *job_dir) {
+  return concatenate(job_dir, "", JOB_DIR_TO_JOB_WORK_PATTERN,
+      "job_work_dir_path");
+}
+/**
+ * Get the attempt directory for the given attempt_id
+ */
+char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
+  return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
+      "attempt_dir_path");
+}
+
+/*
+ * Get the path to the task launcher file which is created by the TT
+ */
+char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) {
+  return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN,
+      "task_script_path");
+}
+
+/**
+ * Get the log directory for the given attempt.
+ */
+char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
+  return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
+      "task_log_dir");
+}
+
+/**
+ * Function to check if the passed tt_root is present in mapred.local.dir
+ * the task-controller is configured with.
+ */
+int check_tt_root(const char *tt_root) {
+  return check_variable_against_config(TT_SYS_DIR_KEY, tt_root);
+}
+
+/**
+ * Function to check if the constructed path and absolute path of the task
+ * launcher file resolve to one and same. This is done so as to avoid
+ * security pitfalls because of relative path components in the file name.
+ */
+int check_task_launcher_path(char *path) {
   char * resolved_path = (char *) canonicalize_file_name(path);
-  if(resolved_path == NULL) {
+  if (resolved_path == NULL) {
+    fprintf(LOGFILE,
+        "Error resolving the task launcher file path: %s. Passed path: %s\n",
+        strerror(errno), path);
     return ERROR_RESOLVING_FILE_PATH;
   }
-  if(strcmp(resolved_path, path) !=0) {
+  if (strcmp(resolved_path, path) != 0) {
+    fprintf(LOGFILE,
+        "Relative path components in the file path: %s. Resolved path: %s\n",
+        path, resolved_path);
     free(resolved_path);
     return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
   }
   free(resolved_path);
   return 0;
 }
+
 /**
- * Function to check if a user actually owns the file.
+ * Function to change the owner/group of a given path.
  */
-int check_owner(uid_t uid, char *path) {
-  struct stat filestat;
-  if(stat(path, &filestat)!=0) {
-    return UNABLE_TO_STAT_FILE;
+static int change_owner(const char *path, uid_t uid, gid_t gid) {
+  int exit_code = chown(path, uid, gid);
+  if (exit_code != 0) {
+    fprintf(LOGFILE, "chown %d:%d for path %s failed: %s.\n", uid, gid, path,
+        strerror(errno));
   }
-  //check owner.
-  if(uid != filestat.st_uid){
-    return FILE_NOT_OWNED_BY_TASKTRACKER;
+  return exit_code;
+}
+
+/**
+ * Function to change the mode of a given path.
+ */
+static int change_mode(const char *path, mode_t mode) {
+  int exit_code = chmod(path, mode);
+  if (exit_code != 0) {
+    fprintf(LOGFILE, "chown %d of path %s failed: %s.\n", mode, path,
+        strerror(errno));
   }
-  return 0;
+  return exit_code;
 }
 
-/*
- * function to provide path to the task file which is created by the tt
- *
- *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern
+/**
+ * Function to secure the given path. It does the following recursively:
+ *    1) changes the owner/group of the paths to the passed owner/group
+ *    2) changes the file permission to the passed file_mode and directory
+ *       permission to the passed dir_mode
+ */
+static int secure_path(const char *path, uid_t uid, gid_t gid,
+    mode_t file_mode, mode_t dir_mode) {
+  FTS *tree = NULL; // the file hierarchy
+  FTSENT *entry = NULL; // a file in the hierarchy
+  char *paths[] = { (char *) path };
+  int process_path = 0;
+  int dir = 0;
+  int error_code = 0;
+  int done = 0;
+
+  // Get physical locations and don't resolve the symlinks.
+  // Don't change directory while walking the directory.
+  int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
+
+  tree = fts_open(paths, ftsoptions, NULL);
+  if (tree == NULL) {
+    fprintf(LOGFILE,
+        "Cannot open file traversal structure for the path %s:%s.\n", path,
+        strerror(errno));
+    return -1;
+  }
+
+  while (((entry = fts_read(tree)) != NULL) && !done) {
+    dir = 0;
+    switch (entry->fts_info) {
+    case FTS_D:
+      // A directory being visited in pre-order.
+      // We change ownership of directories in post-order.
+      // so ignore the pre-order visit.
+      process_path = 0;
+      break;
+    case FTS_DC:
+      // A directory that causes a cycle in the tree
+      // We don't expect cycles, ignore.
+      process_path = 0;
+      break;
+    case FTS_DNR:
+      // A directory which cannot be read
+      // Ignore and set error code.
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_DOT:
+      // "."  or ".."
+      process_path = 0;
+      break;
+    case FTS_F:
+      // A regular file
+      process_path = 1;
+      break;
+    case FTS_DP:
+      // A directory being visited in post-order
+      if (entry->fts_level == 0) {
+        // root directory. Done with traversing.
+        done = 1;
+      }
+      process_path = 1;
+      dir = 1;
+      break;
+    case FTS_SL:
+      // A symbolic link
+      process_path = 1;
+      break;
+    case FTS_SLNONE:
+      // A symbolic link with a nonexistent target
+      process_path = 1;
+      break;
+    case FTS_NS:
+      // A  file for which no stat(2) information was available
+      // Ignore and set error code
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_ERR:
+      // An error return. Ignore and set error code.
+      process_path = 0;
+      error_code = -1;
+      break;
+    case FTS_DEFAULT:
+      // File that doesn't belong to any of the above type. Ignore.
+      process_path = 0;
+      break;
+    default:
+      // None of the above. Ignore and set error code
+      process_path = 0;
+      error_code = -1;
+    }
+
+    if (error_code != 0) {
+      break;
+    }
+    if (!process_path) {
+      continue;
+    }
+
+    if (check_ownership(getuid(), getgid(), entry->fts_path) != 0) {
+      fprintf(LOGFILE,
+          "Invalid file path. %s not user/group owned by the tasktracker.\n",
+          entry->fts_path);
+      error_code = -1;
+    } else if (change_owner(entry->fts_path, uid, gid) != 0) {
+      fprintf(LOGFILE, "couldn't change the ownership of %s\n",
+          entry->fts_path);
+      error_code = -3;
+    } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
+      fprintf(LOGFILE, "couldn't change the permissions of %s\n",
+          entry->fts_path);
+      error_code = -3;
+    }
+  }
+  if (fts_close(tree) != 0) {
+    fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
+        strerror(errno));
+  }
+  return error_code;
+}
+
+/**
+ * Function to prepare the attempt directories for the task JVM.
+ * This is done by changing the ownership of the attempt directory recursively
+ * to the job owner. We do the following:
+ *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/
+ *     *  sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/
  */
-void get_task_file_path(const char * jobid, const char * taskid,
-    const char * tt_root, char **task_script_path) {
-  const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY);
-  *task_script_path = NULL;
-  int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen(
-      taskid)) + strlen(tt_root);
+int prepare_attempt_directories(const char *job_id, const char *attempt_id,
+    const char *user) {
+  if (job_id == NULL || attempt_id == NULL || user == NULL) {
+    fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
 
-  if (mapred_local_dir == NULL) {
-    return;
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+    return INVALID_USER_NAME;
   }
 
-  *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1));
-  if (*task_script_path == NULL) {
-    fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n");
-    free(mapred_local_dir);
-    return;
+  int tasktracker_gid = getgid();
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
   }
 
-  memset(*task_script_path,'\0',str_len+1);
-  snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root,
-      jobid, taskid);
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
 #ifdef DEBUG
-  fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path);
-  fflush(LOGFILE);
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
 #endif
-  free(mapred_local_dir);
+
+  char *job_dir;
+  char *attempt_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    job_dir = get_job_directory(*local_dir_ptr, job_id);
+    if (job_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get job directory for %s.\n", job_id);
+      failed = 1;
+      break;
+    }
+
+    // prepare attempt-dir in each of the mapred_local_dir
+    attempt_dir = get_attempt_directory(job_dir, attempt_id);
+    if (attempt_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get attempt directory for %s.\n", attempt_id);
+      failed = 1;
+      free(job_dir);
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(attempt_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE,
+            "attempt_dir %s doesn't exist. Not doing anything.\n", attempt_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the attempt_dir %s\n", attempt_dir);
+        failed = 1;
+        free(attempt_dir);
+        free(job_dir);
+        break;
+      }
+    } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid,
+        S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+      // No setgid on files and setgid on dirs, 770
+      fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
+      failed = 1;
+      free(attempt_dir);
+      free(job_dir);
+      break;
+    }
+
+    local_dir_ptr++;
+    free(attempt_dir);
+    free(job_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+
+  cleanup();
+  if (failed) {
+    return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+  }
+  return 0;
 }
 
-//end of private functions
-void display_usage(FILE *stream) {
-  fprintf(stream,
-      "Usage: task-controller [-l logfile] user command command-args\n");
+/**
+ * Function to prepare the task logs for the child. It gives the user
+ * ownership of the attempt's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ *     *  sudo chown user:mapred log-dir/userlogs/$attemptid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ */
+int prepare_task_logs(const char *log_dir, const char *task_id) {
+
+  char *task_log_dir = get_task_log_dir(log_dir, task_id);
+  if (task_log_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
+    return -1;
+  }
+
+  struct stat filestat;
+  if (stat(task_log_dir, &filestat) != 0) {
+    if (errno == ENOENT) {
+      // See TaskRunner.java to see that an absent log-dir doesn't fail the task.
+      // Task log dir for cleanup tasks will not have the name
+      // task-attempt-id.cleanup. Instead a log.index.cleanup is created in
+      // task-attempt log dir. We check if the directory exists and return if
+      // it doesn't. So the following will work for cleanup attempts too.
+#ifdef DEBUG
+      fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
+          task_log_dir);
+#endif
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+      return -1;
+    }
+  }
+
+  int tasktracker_gid = getgid();
+  if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
+      | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+    // setgid on dirs but not files, 770. As of now, there are no files though
+    fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+    return -1;
+  }
+  return 0;
 }
 
 //function used to populate and user_details structure.
@@ -184,30 +509,197 @@ int get_user_details(const char *user) {
 }
 
 /*
- *Function used to launch a task as the provided user.
- * First the function checks if the tt_root passed is found in
- * hadoop.temp.dir
- * Uses get_task_file_path to fetch the task script file path.
- * Does an execlp on the same in order to replace the current image with
- * task image.
+ * Function to check if a user/group actually owns the file.
+  */
+int check_ownership(uid_t uid, gid_t gid, char *path) {
+  struct stat filestat;
+  if (stat(path, &filestat) != 0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  // check user/group.
+  if (uid != filestat.st_uid || gid != filestat.st_gid) {
+    return FILE_NOT_OWNED_BY_TASKTRACKER;
+  }
+  return 0;
+}
+
+/**
+ * Function to prepare the job directories for the task JVM.
+ * We do the following:
+ *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid
+ *     *  sudo chmod 2570 -R taskTracker/jobcache/$jobid
+ *     *  sudo chmod 2770 taskTracker/jobcache/$jobid/work
  */
+int initialize_job(const char *jobid, const char *user) {
+  if (jobid == NULL || user == NULL) {
+    fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+    return INVALID_USER_NAME;
+  }
+
+  gid_t tasktracker_gid = getgid(); // TaskTracker's group-id
 
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
+#endif
+
+  char *job_dir, *job_work_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    job_dir = get_job_directory(*local_dir_ptr, jobid);
+    if (job_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get job directory for %s.\n", jobid);
+      failed = 1;
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(job_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE, "job_dir %s doesn't exist. Not doing anything.\n",
+            job_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the job_dir %s\n", job_dir);
+        failed = 1;
+        free(job_dir);
+        break;
+      }
+    } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid,
+        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+        != 0) {
+      // No setgid on files and setgid on dirs, 570
+      fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir);
+      failed = 1;
+      free(job_dir);
+      break;
+    } else {
+      job_work_dir = get_job_work_directory(job_dir);
+      if (job_work_dir == NULL) {
+        fprintf(LOGFILE, "Couldn't get job-work directory for %s.\n", jobid);
+        failed = 1;
+        break;
+      }
+
+      // Set 2770 on the job-work directory
+      if (stat(job_work_dir, &filestat) != 0) {
+        if (errno == ENOENT) {
+#ifdef DEBUG
+          fprintf(LOGFILE,
+              "job_work_dir %s doesn't exist. Not doing anything.\n",
+              job_work_dir);
+#endif
+        } else {
+          // stat failed because of something else!
+          fprintf(LOGFILE, "Failed to stat the job_work_dir %s\n",
+              job_work_dir);
+          failed = 1;
+          free(job_work_dir);
+          free(job_dir);
+          break;
+        }
+      } else if (change_mode(job_work_dir, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+        fprintf(LOGFILE,
+            "couldn't change the permissions of job_work_dir %s\n",
+            job_work_dir);
+        failed = 1;
+        free(job_work_dir);
+        free(job_dir);
+        break;
+      }
+    }
+
+    local_dir_ptr++;
+    free(job_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+  cleanup();
+  if (failed) {
+    return INITIALIZE_JOB_FAILED;
+  }
+  return 0;
+}
+
+/**
+ * Function used to initialize task. Prepares attempt_dir, jars_dir and
+ * log_dir to be accessible by the child
+ */
+int initialize_task(const char *jobid, const char *taskid, const char *user) {
+  int exit_code = 0;
+#ifdef DEBUG
+  fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid);
+  fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid);
+#endif
+
+  if (prepare_attempt_directories(jobid, taskid, user) != 0) {
+    fprintf(LOGFILE,
+        "Couldn't prepare the attempt directories for %s of user %s.\n",
+        taskid, user);
+    exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+    goto cleanup;
+  }
+
+  char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+  if (log_dir == NULL) {
+    fprintf(LOGFILE, "Log directory is not configured.\n");
+    exit_code = INVALID_TT_LOG_DIR;
+    goto cleanup;
+  }
+
+  if (prepare_task_logs(log_dir, taskid) != 0) {
+    fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
+        log_dir, taskid);
+    exit_code = PREPARE_TASK_LOGS_FAILED;
+  }
+
+  cleanup:
+  // free configurations
+  cleanup();
+  if (log_dir != NULL) {
+    free(log_dir);
+  }
+  return exit_code;
+}
+
+/*
+ * Function used to launch a task as the provided user. It does the following :
+ * 1) Checks if the tt_root passed is found in mapred.local.dir
+ * 2) Prepares attempt_dir and log_dir to be accessible by the child
+ * 3) Uses get_task_launcher_file to fetch the task script file path
+ * 4) Does an execlp on the same in order to replace the current image with
+ * task image.
+ */
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root) {
-  char *task_script_path = NULL;
   int exit_code = 0;
-  uid_t uid = getuid();
 
-  if(jobid == NULL || taskid == NULL) {
+  if (jobid == NULL || taskid == NULL || tt_root == NULL) {
     return INVALID_ARGUMENT_NUMBER;
   }
 
 #ifdef DEBUG
-  fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid);
-  fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid);
-  fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root);
-  fflush(LOGFILE);
+  fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
+  fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
+  fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
 #endif
+
   //Check tt_root before switching the user, as reading configuration
   //file requires privileged access.
   if (check_tt_root(tt_root) < 0) {
@@ -216,34 +708,48 @@ int run_task_as_user(const char * user, 
     return INVALID_TT_ROOT;
   }
 
-  //change the user
-  fclose(LOGFILE);
-  fcloseall();
-  umask(0);
-  if (change_user(user) != 0) {
-    cleanup();
-    return SETUID_OPER_FAILED;
+  char *job_dir = NULL, *task_script_path = NULL;
+
+  if ((exit_code = initialize_task(jobid, taskid, user)) != 0) {
+    fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid,
+        user);
+    goto cleanup;
   }
 
-  get_task_file_path(jobid, taskid, tt_root, &task_script_path);
+  job_dir = get_job_directory(tt_root, jobid);
+  if (job_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
+  }
+
+  task_script_path = get_task_launcher_file(job_dir, taskid);
   if (task_script_path == NULL) {
-    cleanup();
-    return INVALID_TASK_SCRIPT_PATH;
+    fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir);
+    exit_code = OUT_OF_MEMORY;
+    goto cleanup;
   }
+
   errno = 0;
-  exit_code = check_path(task_script_path);
+  exit_code = check_task_launcher_path(task_script_path);
   if(exit_code != 0) {
     goto cleanup;
   }
-  errno = 0;
-  exit_code = check_owner(uid, task_script_path);
-  if(exit_code != 0) {
+
+  //change the user
+  fcloseall();
+  free(job_dir);
+  umask(0007);
+  if (change_user(user) != 0) {
+    exit_code = SETUID_OPER_FAILED;
     goto cleanup;
   }
+
   errno = 0;
   cleanup();
   execlp(task_script_path, task_script_path, NULL);
   if (errno != 0) {
+    fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
     free(task_script_path);
     exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
   }
@@ -251,6 +757,9 @@ int run_task_as_user(const char * user, 
   return exit_code;
 
 cleanup:
+  if (job_dir != NULL) {
+    free(job_dir);
+  }
   if (task_script_path != NULL) {
     free(task_script_path);
   }
@@ -264,19 +773,25 @@ cleanup:
  * The function sends appropriate signal to the process group
  * specified by the task_pid.
  */
-
 int kill_user_task(const char *user, const char *task_pid, int sig) {
   int pid = 0;
 
   if(task_pid == NULL) {
     return INVALID_ARGUMENT_NUMBER;
   }
+
+#ifdef DEBUG
+  fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user);
+  fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid);
+  fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig);
+#endif
+
   pid = atoi(task_pid);
 
   if(pid <= 0) {
     return INVALID_TASK_PID;
   }
-  fclose(LOGFILE);
+
   fcloseall();
   if (change_user(user) != 0) {
     cleanup();
@@ -286,6 +801,7 @@ int kill_user_task(const char *user, con
   //Don't continue if the process-group is not alive anymore.
   if(kill(-pid,0) < 0) {
     errno = 0;
+    cleanup();
     return 0;
   }
 
@@ -300,4 +816,3 @@ int kill_user_task(const char *user, con
   cleanup();
   return 0;
 }
-

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar  4 03:42:01 2011
@@ -28,14 +28,20 @@
 #include <sys/stat.h>
 #include <sys/signal.h>
 #include <getopt.h>
-#include<grp.h>
+#include <grp.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <fts.h>
+
 #include "configuration.h"
 
 //command definitions
 enum command {
+  INITIALIZE_JOB,
   LAUNCH_TASK_JVM,
+  INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
-  KILL_TASK_JVM
+  KILL_TASK_JVM,
 };
 
 enum errorcodes {
@@ -45,23 +51,33 @@ enum errorcodes {
   SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
   INVALID_TT_ROOT, //5
   SETUID_OPER_FAILED, //6
-  INVALID_TASK_SCRIPT_PATH, //7
-  UNABLE_TO_EXECUTE_TASK_SCRIPT, //8
-  UNABLE_TO_KILL_TASK, //9
-  INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10
-  INVALID_TASK_PID, //11
-  ERROR_RESOLVING_FILE_PATH, //12
-  RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
-  UNABLE_TO_STAT_FILE, //14
-  FILE_NOT_OWNED_BY_TASKTRACKER //15
+  UNABLE_TO_EXECUTE_TASK_SCRIPT, //7
+  UNABLE_TO_KILL_TASK, //8
+  INVALID_TASK_PID, //9
+  ERROR_RESOLVING_FILE_PATH, //10
+  RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //11
+  UNABLE_TO_STAT_FILE, //12
+  FILE_NOT_OWNED_BY_TASKTRACKER, //13
+  PREPARE_ATTEMPT_DIRECTORIES_FAILED, //14
+  INITIALIZE_JOB_FAILED, //15
+  PREPARE_TASK_LOGS_FAILED, //16
+  INVALID_TT_LOG_DIR, //17
+  OUT_OF_MEMORY, //18
 };
 
+#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+
+#define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
+
+#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
 
-#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
+#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 
 #define TT_SYS_DIR_KEY "mapred.local.dir"
 
-#define MAX_ITEMS 10
+#define TT_LOG_DIR_KEY "hadoop.log.dir"
 
 #ifndef HADOOP_CONF_DIR
   #define EXEC_PATTERN "/bin/task-controller"
@@ -72,10 +88,22 @@ extern struct passwd *user_detail;
 
 extern FILE *LOGFILE;
 
-void display_usage(FILE *stream);
+int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root);
+
+int initialize_task(const char *jobid, const char *taskid, const char *user);
 
-int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root);
+int initialize_job(const char *jobid, const char *user);
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
+int prepare_attempt_directory(const char *attempt_dir, const char *user);
+
+// The following functions are exposed for testing
+
+int check_variable_against_config(const char *config_key,
+    const char *passed_value);
+
 int get_user_details(const char *user);
+
+char *get_task_launcher_file(const char *job_dir, const char *attempt_dir);

Added: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=1077111&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c Fri Mar  4 03:42:01 2011
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../task-controller.h"
+
+#define HADOOP_CONF_DIR "/tmp"
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  char const *str =
+      "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fwrite(str, 1, strlen(str), file);
+  fclose(file);
+  return 0;
+}
+
+void test_check_variable_against_config() {
+
+  // A temporary configuration directory
+  char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
+
+  // To accomodate "/conf/taskcontroller.cfg"
+  char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
+
+  strcpy(template, conf_dir_templ);
+  char *temp_dir = mkdtemp(template);
+  if (temp_dir == NULL) {
+    printf("Couldn't create a temporary dir for conf.\n");
+    goto cleanup;
+  }
+
+  // Set the configuration directory
+  hadoop_conf_dir = strdup(temp_dir);
+
+  // create the configuration directory
+  strcat(template, "/conf");
+  char *conf_dir = strdup(template);
+  mkdir(conf_dir, S_IRWXU);
+
+  // create the configuration file
+  strcat(template, "/taskcontroller.cfg");
+  if (write_config_file(template) != 0) {
+    printf("Couldn't write the configuration file.\n");
+    goto cleanup;
+  }
+
+  // Test obtaining a value for a key from the config
+  char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
+      "/tmp/testing3", "/tmp/testing4" };
+  char *value = (char *) get_value("mapred.local.dir");
+  if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
+      != 0) {
+    printf("Obtaining a value for a key from the config failed.\n");
+    goto cleanup;
+  }
+
+  // Test the parsing of a multiple valued key from the config
+  char **values = (char **)get_values("mapred.local.dir");
+  char **values_ptr = values;
+  int i = 0;
+  while (*values_ptr != NULL) {
+    printf(" value : %s\n", *values_ptr);
+    if (strcmp(*values_ptr, config_values[i++]) != 0) {
+      printf("Configured values are not read out properly. Test failed!");
+      goto cleanup;;
+    }
+    values_ptr++;
+  }
+
+  if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) {
+    printf("Configuration should not contain /tmp/testing5! \n");
+    goto cleanup;
+  }
+
+  if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) {
+    printf("Configuration should contain /tmp/testing4! \n");
+    goto cleanup;
+  }
+
+  cleanup: if (value != NULL) {
+    free(value);
+  }
+  if (values != NULL) {
+    free(values);
+  }
+  if (hadoop_conf_dir != NULL) {
+    free(hadoop_conf_dir);
+  }
+  unlink(template);
+  rmdir(conf_dir);
+  rmdir(hadoop_conf_dir);
+}
+
+void test_get_job_directory() {
+  char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+  printf("job_dir obtained is %s\n", job_dir);
+  int ret = 0;
+  if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+    ret = -1;
+  }
+  free(job_dir);
+  assert(ret == 0);
+}
+
+void test_get_attempt_directory() {
+  char *attempt_dir = (char *) get_attempt_directory(
+      "/tmp/taskTracker/jobcache/job_200906101234_0001",
+      "attempt_200906112028_0001_m_000000_0");
+  printf("attempt_dir obtained is %s\n", attempt_dir);
+  int ret = 0;
+  if (strcmp(
+      attempt_dir,
+      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      != 0) {
+    ret = -1;
+  }
+  free(attempt_dir);
+  assert(ret == 0);
+}
+
+void test_get_task_launcher_file() {
+  char *task_file = (char *) get_task_launcher_file(
+      "/tmp/taskTracker/jobcache/job_200906101234_0001",
+      "attempt_200906112028_0001_m_000000_0");
+  printf("task_file obtained is %s\n", task_file);
+  int ret = 0;
+  if (strcmp(
+      task_file,
+      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+      != 0) {
+    ret = -1;
+  }
+  free(task_file);
+  assert(ret == 0);
+}
+
+void test_get_task_log_dir() {
+  char *logdir = (char *) get_task_log_dir("/tmp/testing",
+      "attempt_200906112028_0001_m_000000_0");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir,
+      "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
+int main(int argc, char **argv) {
+  printf("Starting tests\n");
+  LOGFILE = stdout;
+  test_check_variable_against_config();
+  test_get_job_directory();
+  test_get_attempt_directory();
+  test_get_task_launcher_file();
+  test_get_task_log_dir();
+  printf("Finished tests\n");
+  return 0;
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/build-contrib.xml Fri Mar  4 03:42:01 2011
@@ -234,7 +234,7 @@
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
       <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/> 
       <sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
-      <sysproperty key="taskcontroller-user" value="${taskcontroller-user}"/>
+      <sysproperty key="taskcontroller-ugi" value="${taskcontroller-ugi}"/>
       <classpath refid="test.classpath"/>
       <formatter type="${test.junit.output.format}" />
       <batchtest todir="${build.test}" unless="testcase">

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java Fri Mar  4 03:42:01 2011
@@ -65,7 +65,7 @@ public class TestStreamingAsDifferentUse
             "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") };
     StreamJob streamJob = new StreamJob(args, true);
     streamJob.setConf(myConf);
-    streamJob.go();
+    assertTrue("Job has not succeeded", streamJob.go() == 0);
     assertOwnerShip(outputPath);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Fri Mar  4 03:42:01 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.StringUtils;
 
 import junit.framework.TestCase;
 
@@ -89,10 +90,14 @@ public class TestUlimit extends TestCase
       assertFalse("output not cleaned up", fs.exists(outputPath));
       mr.waitUntilIdle();
     } catch(IOException e) {
-      fail(e.toString());
+      fail(StringUtils.stringifyException(e));
     } finally {
-      mr.shutdown();
-      dfs.shutdown();
+      if (mr != null) {
+        mr.shutdown();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml Fri Mar  4 03:42:01 2011
@@ -509,11 +509,13 @@
             distribution. The task tracker uses this executable to 
             launch and kill tasks. The setuid executable switches to
             the user who has submitted the job and launches or kills
-            the tasks. Currently, this task controller 
-            opens up permissions to local files and directories used 
-            by the tasks such as the job jar files, distributed archive 
-            files, intermediate files and task log files. In future,
-            it is expected that stricter file permissions are used.
+            the tasks. For maximum security, this task controller 
+            sets up restricted permissions and user/group ownership of
+            local files and directories used by the tasks such as the
+            job jar files, intermediate files and task log files. Currently
+            permissions on distributed cache files are opened up to be
+            accessible by all users. In future, it is expected that stricter
+            file permissions are set for these files too.
             </td>
             </tr>
             </table>
@@ -555,18 +557,32 @@
             </p>
             
             <p>
-            The executable must be deployed as a setuid executable, by changing
-            the ownership to <em>root</em>, group ownership to that of tasktracker
-            and giving it permissions <em>4510</em>.Please take a note that,
-            group which owns task-controller should contain only tasktracker
-            as its memeber and not users who submit jobs.
+            The executable must have specific permissions as follows. The
+            executable should have <em>6050 or --Sr-s---</em> permissions
+            user-owned by root(super-user) and group-owned by a group 
+            of which only the TaskTracker's user is the sole group member. 
+            For example, let's say that the TaskTracker is run as user
+            <em>mapred</em> who is part of the groups <em>users</em> and
+            <em>mapredGroup</em> any of them being the primary group.
+            Let also be that <em>users</em> has both <em>mapred</em> and
+            another user <em>X</em> as its members, while <em>mapredGroup</em>
+            has only <em>mapred</em> as its member. Going by the above
+            description, the setuid/setgid executable should be set
+            <em>6050 or --Sr-s---</em> with user-owner as <em>mapred</em> and
+            group-owner as <em>mapredGroup</em> which has
+            only <em>mapred</em> as its member(and not <em>users</em> which has
+            <em>X</em> also as its member besides <em>mapred</em>).
             </p>
             
             <p>The executable requires a configuration file called 
             <em>taskcontroller.cfg</em> to be
             present in the configuration directory passed to the ant target 
             mentioned above. If the binary was not built with a specific 
-            conf directory, the path defaults to <em>/path-to-binary/../conf</em>.
+            conf directory, the path defaults to
+            <em>/path-to-binary/../conf</em>. The configuration file must be
+            owned by the user running TaskTracker (user <em>mapred</em> in the
+            above example), group-owned by anyone and should have the
+            permissions <em>0400 or r--------</em>.
             </p>
             
             <p>The executable requires following configuration items to be 
@@ -581,13 +597,20 @@
             validate paths passed to the setuid executable in order to prevent
             arbitrary paths being passed to it.</td>
             </tr>
+            <tr>
+            <td>hadoop.log.dir</td>
+            <td>Path to hadoop log directory. Should be same as the value which
+            the TaskTracker is started with. This is required to set proper
+            permissions on the log files so that they can be written to by the user's
+            tasks and read by the TaskTracker for serving on the web UI.</td>
+            </tr>
             </table>
 
             <p>
-            The LinuxTaskController requires that paths leading up to
+            The LinuxTaskController requires that paths including and leading up to
             the directories specified in
-            <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be 755
-            and directories themselves having 777 permissions.
+            <em>mapred.local.dir</em> and <em>hadoop.log.dir</em> to be set 755
+            permissions.
             </p>
             </section>
             

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:42:01 2011
@@ -154,6 +154,10 @@ class Child {
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.createSecretKey(jt.getPassword()));
 
+        // setup the child's mapred-local-dir. The child is now sandboxed and
+        // can only see files down and under attemtdir only.
+        TaskRunner.setupChildMapredLocalDirs(task, job);
+
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
@@ -165,8 +169,6 @@ class Child {
 
         task.setConf(job);
 
-        defaultConf.addResource(new Path(task.getJobFile()));
-
         // Initiate Java VM metrics
         JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
         // use job-specified working directory
@@ -191,7 +193,7 @@ class Child {
           task.taskCleanup(umbilical);
         }
       } catch (Exception e) {
-        LOG.info("Error cleaning up" + e);
+        LOG.info("Error cleaning up", e);
       }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 03:42:01 2011
@@ -49,6 +49,8 @@ class DefaultTaskController extends Task
    */
   void launchTaskJVM(TaskController.TaskControllerContext context) 
                                       throws IOException {
+    initializeTask(context);
+
     JvmEnv env = context.env;
     List<String> wrappedCommand = 
       TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
@@ -73,20 +75,13 @@ class DefaultTaskController extends Task
     // So this is a dummy method.
     return;
   }
-  
-
-  @Override
-  void setup() {
-    // nothing to setup
-    return;
-  }
 
   /*
    * No need to do anything as we don't need to do as we dont need anything
    * extra from what TaskTracker has done.
    */
   @Override
-  void initializeJob(JobID jobId) {
+  void initializeJob(JobInitializationContext context) {
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar  4 03:42:01 2011
@@ -167,17 +167,24 @@ public class IsolationRunner {
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
     File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
     local.setWorkingDirectory(new Path(workDirName.toString()));
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
     // set up a classloader with the right classpath
-    ClassLoader classLoader = makeClassLoader(conf, workDirName);
+    ClassLoader classLoader =
+        makeClassLoader(conf, new File(workDirName.toString()));
     Thread.currentThread().setContextClassLoader(classLoader);
     conf.setClassLoader(classLoader);
     
+    // split.dta file is used only by IsolationRunner. The file can now be in
+    // any of the configured local disks, so use LocalDirAllocator to find out
+    // where it is.
     Path localMetaSplit = 
-      new Path(new Path(jobFilename.toString()).getParent(), "split.info");
+        new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+            TaskTracker.getLocalSplitFile(taskId.getJobID().toString(), taskId
+                .toString()), conf);
     DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
     TaskSplitIndex splitIndex = new TaskSplitIndex();
     splitIndex.readFields(splitFile);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Mar  4 03:42:01 2011
@@ -1425,7 +1425,7 @@ public class JobConf extends Configurati
    * @return The localized job specific shared directory
    */
   public String getJobLocalDir() {
-    return get("job.local.dir");
+    return get(TaskTracker.JOB_LOCAL_DIR);
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 03:42:01 2011
@@ -112,7 +112,8 @@ class JvmManager {
     }
   }
 
-  public TaskInProgress getTaskForJvm(JVMId jvmId) {
+  public TaskInProgress getTaskForJvm(JVMId jvmId)
+      throws IOException {
     if (jvmId.isMapJVM()) {
       return mapJvmManager.getTaskForJvm(jvmId);
     } else {
@@ -177,7 +178,8 @@ class JvmManager {
       jvmIdToRunner.get(jvmId).setBusy(true);
     }
     
-    synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
+    synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
+        throws IOException {
       if (jvmToRunningTask.containsKey(jvmId)) {
         //Incase of JVM reuse, tasks are returned to previously launched
         //JVM via this method. However when a new task is launched
@@ -185,15 +187,24 @@ class JvmManager {
         TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
         JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
         Task task = taskRunner.getTaskInProgress().getTask();
-        TaskControllerContext context = 
-          new TaskController.TaskControllerContext();
+
+        // Initialize task dirs
+        TaskControllerContext context =
+            new TaskController.TaskControllerContext();
         context.env = jvmRunner.env;
         context.task = task;
-        //If we are returning the same task as which the JVM was launched
-        //we don't initialize task once again.
-        if(!jvmRunner.env.conf.get("mapred.task.id").
-            equals(task.getTaskID().toString())) {
-          tracker.getTaskController().initializeTask(context);
+        // If we are returning the same task as which the JVM was launched
+        // we don't initialize task once again.
+        if (!jvmRunner.env.conf.get("mapred.task.id").equals(
+            task.getTaskID().toString())) {
+          try {
+            tracker.getTaskController().initializeTask(context);
+          } catch (IOException e) {
+            LOG.warn("Failed to initialize the new task "
+                + task.getTaskID().toString() + " to be given to JVM with id "
+                + jvmId);
+            throw e;
+          }
         }
 
         jvmRunner.taskGiven(task);
@@ -405,7 +416,6 @@ class JvmManager {
           //Launch the task controller to run task JVM
           initalContext.task = jvmToRunningTask.get(jvmId).getTask();
           initalContext.env = env;
-          tracker.getTaskController().initializeTask(initalContext);
           tracker.getTaskController().launchTaskJVM(initalContext);
         } catch (IOException ioe) {
           // do nothing
@@ -415,13 +425,13 @@ class JvmManager {
           if (shexec == null) {
             return;
           }
-          
+
           kill();
-          
+
           int exitCode = shexec.getExitCode();
           updateOnJvmExit(jvmId, exitCode);
-          LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
-              numTasksRan);
+          LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
+              + ". Number of tasks it ran: " + numTasksRan);
           try {
             // In case of jvm-reuse,
             //the task jvm cleans up the common workdir for every 
@@ -445,6 +455,8 @@ class JvmManager {
             initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
               .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
                   ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            // Destroy the task jvm
             controller.destroyTaskJVM(initalContext);
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId



Mime
View raw message