hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject hadoop git commit: YARN-3365. Enhanced NodeManager to support using the 'tc' tool via container-executor for outbound network traffic control. Contributed by Sidharta Seethana.
Date Thu, 02 Apr 2015 23:55:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 ddb5bb8fc -> d8e17c58b


YARN-3365. Enhanced NodeManager to support using the 'tc' tool via container-executor for outbound network traffic control. Contributed by Sidharta Seethana.

(cherry picked from commit b21c72777ae664b08fd1a93b4f88fa43f2478d94)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8e17c58
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8e17c58
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8e17c58

Branch: refs/heads/branch-2
Commit: d8e17c58bcbff16c19bd2cba53a85baa7fec550b
Parents: ddb5bb8
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Thu Apr 2 16:53:59 2015 -0700
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Thu Apr 2 16:55:00 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   4 +
 .../impl/container-executor.c                   | 100 +++-
 .../impl/container-executor.h                   |  34 +-
 .../main/native/container-executor/impl/main.c  | 465 +++++++++++++------
 .../nodemanager/TestLinuxContainerExecutor.java | 421 +++++++++++------
 5 files changed, 722 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8e17c58/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ae5436b..679b981 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -8,6 +8,10 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3345. Add non-exclusive node label API. (Wangda Tan via jianhe)
 
+    YARN-3365. Enhanced NodeManager to support using the 'tc' tool via
+    container-executor for outbound network traffic control. (Sidharta Seethana
+    via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8e17c58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
index 1c214c6..6727838 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
@@ -40,6 +40,12 @@ static const int DEFAULT_MIN_USERID = 1000;
 
 static const char* DEFAULT_BANNED_USERS[] = {"mapred", "hdfs", "bin", 0};
 
+//location of traffic control binary
+static const char* TC_BIN = "/sbin/tc";
+static const char* TC_MODIFY_STATE_OPTS [] = { "-b" , NULL};
+static const char* TC_READ_STATE_OPTS [] = { "-b", NULL};
+static const char* TC_READ_STATS_OPTS [] = { "-s",  "-b", NULL};
+
 //struct to store the user details
 struct passwd *user_detail = NULL;
 
@@ -291,27 +297,20 @@ static int write_exit_code_file(const char* exit_code_file, int exit_code) {
   return 0;
 }
 
-/**
- * Wait for the container process to exit and write the exit code to
- * the exit code file.
- * Returns the exit code of the container process.
- */
-static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+static int wait_and_get_exit_code(pid_t pid) {
   int child_status = -1;
   int exit_code = -1;
   int waitpid_result;
 
-  if (change_effective_user(nm_uid, nm_gid) != 0) {
-    return -1;
-  }
   do {
-    waitpid_result = waitpid(pid, &child_status, 0);
+      waitpid_result = waitpid(pid, &child_status, 0);
   } while (waitpid_result == -1 && errno == EINTR);
+
   if (waitpid_result < 0) {
-    fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
-        pid, strerror(errno));
+    fprintf(LOGFILE, "error waiting for process %d - %s\n", pid, strerror(errno));
     return -1;
   }
+
   if (WIFEXITED(child_status)) {
     exit_code = WEXITSTATUS(child_status);
   } else if (WIFSIGNALED(child_status)) {
@@ -319,9 +318,26 @@ static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
   } else {
     fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
   }
+
+  return exit_code;
+}
+
+/**
+ * Wait for the container process to exit and write the exit code to
+ * the exit code file.
+ * Returns the exit code of the container process.
+ */
+static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+  int exit_code = -1;
+
+  if (change_effective_user(nm_uid, nm_gid) != 0) {
+    return -1;
+  }
+  exit_code = wait_and_get_exit_code(pid);
   if (write_exit_code_file(exit_code_file, exit_code) < 0) {
     return -1;
   }
+
   return exit_code;
 }
 
@@ -1470,3 +1486,63 @@ int mount_cgroup(const char *pair, const char *hierarchy) {
 #endif
 }
 
+static int run_traffic_control(const char *opts[], char *command_file) {
+  const int max_tc_args = 16;
+  char *args[max_tc_args];
+  int i = 0, j = 0;
+
+  args[i++] = TC_BIN;
+  while (opts[j] != NULL && i < max_tc_args - 1) {
+    args[i] = opts[j];
+    ++i, ++j;
+  }
+  //too many args to tc
+  if (i == max_tc_args - 1) {
+    fprintf(LOGFILE, "too many args to tc");
+    return TRAFFIC_CONTROL_EXECUTION_FAILED;
+  }
+  args[i++] = command_file;
+  args[i] = 0;
+
+  pid_t child_pid = fork();
+  if (child_pid != 0) {
+    int exit_code = wait_and_get_exit_code(child_pid);
+    if (exit_code != 0) {
+      fprintf(LOGFILE, "failed to execute tc command!\n");
+      return TRAFFIC_CONTROL_EXECUTION_FAILED;
+    }
+    unlink(command_file);
+    return 0;
+  } else {
+    execv(TC_BIN, args);
+    //if we reach here, exec failed
+    fprintf(LOGFILE, "failed to execute tc command! error: %s\n", strerror(errno));
+    return TRAFFIC_CONTROL_EXECUTION_FAILED;
+  }
+}
+
+/**
+ * Run a batch of tc commands that modify interface configuration. command_file
+ * is deleted after being used.
+ */
+int traffic_control_modify_state(char *command_file) {
+  return run_traffic_control(TC_MODIFY_STATE_OPTS, command_file);
+}
+
+/**
+ * Run a batch of tc commands that read interface configuration. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process. command_file is deleted after being used.
+ */
+int traffic_control_read_state(char *command_file) {
+  return run_traffic_control(TC_READ_STATE_OPTS, command_file);
+}
+
+/**
+ * Run a batch of tc commands that read interface stats. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process. command_file is deleted after being used.
+ */
+int traffic_control_read_stats(char *command_file) {
+  return run_traffic_control(TC_READ_STATS_OPTS, command_file);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8e17c58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
index b1efd6a..43ef98d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
@@ -54,7 +54,20 @@ enum errorcodes {
   INVALID_CONFIG_FILE =  24,
   SETSID_OPER_FAILED = 25,
   WRITE_PIDFILE_FAILED = 26,
-  WRITE_CGROUP_FAILED = 27
+  WRITE_CGROUP_FAILED = 27,
+  TRAFFIC_CONTROL_EXECUTION_FAILED = 28
+};
+
+enum operations {
+  CHECK_SETUP = 1,
+  MOUNT_CGROUPS = 2,
+  TRAFFIC_CONTROL_MODIFY_STATE = 3,
+  TRAFFIC_CONTROL_READ_STATE = 4,
+  TRAFFIC_CONTROL_READ_STATS = 5,
+  RUN_AS_USER_INITIALIZE_CONTAINER = 6,
+  RUN_AS_USER_LAUNCH_CONTAINER = 7,
+  RUN_AS_USER_SIGNAL_CONTAINER = 8,
+  RUN_AS_USER_DELETE = 9
 };
 
 #define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@@ -209,3 +222,22 @@ int check_dir(char* npath, mode_t st_mode, mode_t desired,
 
 int create_validate_dir(char* npath, mode_t perm, char* path,
    int finalComponent);
+
+/**
+ * Run a batch of tc commands that modify interface configuration
+ */
+int traffic_control_modify_state(char *command_file);
+
+/**
+ * Run a batch of tc commands that read interface configuration. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process.
+ */
+int traffic_control_read_state(char *command_file);
+
+/**
+ * Run a batch of tc commands that read interface stats. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process.
+ */
+int traffic_control_read_stats(char *command_file);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8e17c58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
index 9b5e784..63fbfe4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
@@ -42,84 +42,71 @@
   #error HADOOP_CONF_DIR must be defined
 #endif
 
-void display_usage(FILE *stream) {
-  fprintf(stream,
-          "Usage: container-executor --checksetup\n");
-  fprintf(stream,
-          "Usage: container-executor --mount-cgroups "\
-          "hierarchy controller=path...\n");
-  fprintf(stream,
-      "Usage: container-executor user yarn-user command command-args\n");
-  fprintf(stream, "Commands:\n");
-  fprintf(stream, "   initialize container: %2d appid tokens " \
-   "nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
-  fprintf(stream,
-      "   launch container:    %2d appid containerid workdir "\
-      "container-script tokens pidfile nm-local-dirs nm-log-dirs resources\n",
-	  LAUNCH_CONTAINER);
-  fprintf(stream, "   signal container:    %2d container-pid signal\n",
-	  SIGNAL_CONTAINER);
-  fprintf(stream, "   delete as user: %2d relative-path\n",
-	  DELETE_AS_USER);
+static void display_usage(FILE *stream) {
+  char *usage_template =
+      "Usage: container-executor --checksetup\n" \
+      "       container-executor --mount-cgroups <hierarchy> <controller=path>...\n" \
+      "       container-executor --tc-modify-state <command-file>\n" \
+      "       container-executor --tc-read-state <command-file>\n" \
+      "       container-executor --tc-read-stats <command-file>\n" \
+      "       container-executor <user> <yarn-user> <command> <command-args>\n"  \
+      "       where command and command-args: \n" \
+      "            initialize container:  %2d appid tokens nm-local-dirs nm-log-dirs cmd app...\n" \
+      "            launch container:      %2d appid containerid workdir container-script " \
+                              "tokens pidfile nm-local-dirs nm-log-dirs resources optional-tc-command-file\n" \
+      "            signal container:      %2d container-pid signal\n" \
+      "            delete as user:        %2d relative-path\n" ;
+
+
+  fprintf(stream, usage_template, INITIALIZE_CONTAINER, LAUNCH_CONTAINER,
+          SIGNAL_CONTAINER, DELETE_AS_USER);
 }
 
-int main(int argc, char **argv) {
-  int invalid_args = 0; 
-  int do_check_setup = 0;
-  int do_mount_cgroups = 0;
-  
-  LOGFILE = stdout;
-  ERRORFILE = stderr;
+/* Sets up log files for normal/error logging */
+static void open_log_files() {
+  if (LOGFILE == NULL) {
+    LOGFILE = stdout;
+  }
 
-  if (argc > 1) {
-    if (strcmp("--mount-cgroups", argv[1]) == 0) {
-      do_mount_cgroups = 1;
-    }
+  if (ERRORFILE == NULL) {
+    ERRORFILE = stderr;
   }
+}
 
-  // Minimum number of arguments required to run 
-  // the std. container-executor commands is 4
-  // 4 args not needed for checksetup option
-  if (argc < 4 && !do_mount_cgroups) {
-    invalid_args = 1;
-    if (argc == 2) {
-      const char *arg1 = argv[1];
-      if (strcmp("--checksetup", arg1) == 0) {
-        invalid_args = 0;
-        do_check_setup = 1;        
-      }      
-    }
+/* Flushes and closes log files */
+static void flush_and_close_log_files() {
+  if (LOGFILE != NULL) {
+    fflush(LOGFILE);
+    fclose(LOGFILE);
+    LOGFILE = NULL;
   }
   
-  if (invalid_args != 0) {
-    display_usage(stdout);
-    return INVALID_ARGUMENT_NUMBER;
+if (ERRORFILE != NULL) {
+    fflush(ERRORFILE);
+    fclose(ERRORFILE);
+    ERRORFILE = NULL;
   }
+}
 
-  int command;
-  const char * app_id = NULL;
-  const char * container_id = NULL;
-  const char * cred_file = NULL;
-  const char * script_file = NULL;
-  const char * current_dir = NULL;
-  const char * pid_file = NULL;
-
-  int exit_code = 0;
-
-  char * dir_to_be_deleted = NULL;
+/** Validates the current container-executor setup. Causes program exit
+in case of validation failures. Also sets up configuration / group information etc.,
+This function is to be called in every invocation of container-executor, irrespective
+of whether an explicit checksetup operation is requested. */
 
+static void assert_valid_setup(char *current_executable) {
   char *executable_file = get_executable();
 
   char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
-  char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
-  char *local_dirs, *log_dirs;
-  char *resources, *resources_key, *resources_value;
+  char *conf_file = resolve_config_path(orig_conf_file, current_executable);
 
   if (conf_file == NULL) {
     fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
+    flush_and_close_log_files();
     exit(INVALID_CONFIG_FILE);
   }
+
   if (check_configuration_permissions(conf_file) != 0) {
+    flush_and_close_log_files();
     exit(INVALID_CONFIG_FILE);
   }
   read_config(conf_file);
@@ -129,13 +116,14 @@ int main(int argc, char **argv) {
   char *nm_group = get_value(NM_GROUP_KEY);
   if (nm_group == NULL) {
     fprintf(ERRORFILE, "Can't get configured value for %s.\n", NM_GROUP_KEY);
+    flush_and_close_log_files();
     exit(INVALID_CONFIG_FILE);
   }
   struct group *group_info = getgrnam(nm_group);
   if (group_info == NULL) {
     fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
             strerror(errno));
-    fflush(LOGFILE);
+    flush_and_close_log_files();
     exit(INVALID_CONFIG_FILE);
   }
   set_nm_uid(getuid(), group_info->gr_gid);
@@ -146,91 +134,162 @@ int main(int argc, char **argv) {
 
   if (check_executor_permissions(executable_file) != 0) {
     fprintf(ERRORFILE, "Invalid permissions on container-executor binary.\n");
-    return INVALID_CONTAINER_EXEC_PERMISSIONS;
+    flush_and_close_log_files();
+    exit(INVALID_CONTAINER_EXEC_PERMISSIONS);
   }
+}
+
+
+/* Use to store parsed input parmeters for various operations */
+static struct {
+  char *cgroups_hierarchy;
+  char *traffic_control_command_file;
+  const char * run_as_user_name;
+  const char * yarn_user_name;
+  char *local_dirs;
+  char *log_dirs;
+  char *resources_key;
+  char *resources_value;
+  char **resources_values;
+  const char * app_id;
+  const char * container_id;
+  const char * cred_file;
+  const char * script_file;
+  const char * current_dir;
+  const char * pid_file;
+  const char *dir_to_be_deleted;
+  int container_pid;
+  int signal;
+} cmd_input;
+
+static int validate_run_as_user_commands(int argc, char **argv, int *operation);
+
+/* Validates that arguments used in the invocation are valid. In case of validation
+failure, an 'errorcode' is returned. In case of successful validation, a zero is
+returned and 'operation' is populated based on the operation being requested.
+Ideally, we should re-factor container-executor to use a more structured, command
+line parsing mechanism (e.g getopt). For the time being, we'll use this manual
+validation mechanism so that we don't have to change the invocation interface.
+*/
 
-  if (do_check_setup != 0) {
-    // basic setup checks done
-    // verified configs available and valid
-    // verified executor permissions
+static int validate_arguments(int argc, char **argv , int *operation) {
+  if (argc < 2) {
+    display_usage(stdout);
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (strcmp("--checksetup", argv[1]) == 0) {
+    *operation = CHECK_SETUP;
     return 0;
   }
 
-  if (do_mount_cgroups) {
+  if (strcmp("--mount-cgroups", argv[1]) == 0) {
+    if (argc < 4) {
+      display_usage(stdout);
+      return INVALID_ARGUMENT_NUMBER;
+    }
     optind++;
-    char *hierarchy = argv[optind++];
-    int result = 0;
+    cmd_input.cgroups_hierarchy = argv[optind++];
+    *operation = MOUNT_CGROUPS;
+    return 0;
+  }
 
-    while (optind < argc && result == 0) {
-      result = mount_cgroup(argv[optind++], hierarchy);
+  if (strcmp("--tc-modify-state", argv[1]) == 0) {
+    if (argc != 3) {
+      display_usage(stdout);
+      return INVALID_ARGUMENT_NUMBER;
     }
-
-    return result;
+    optind++;
+    cmd_input.traffic_control_command_file = argv[optind++];
+    *operation = TRAFFIC_CONTROL_MODIFY_STATE;
+    return 0;
   }
 
-  //checks done for user name
-  if (argv[optind] == NULL) {
-    fprintf(ERRORFILE, "Invalid user name.\n");
-    return INVALID_USER_NAME;
+  if (strcmp("--tc-read-state", argv[1]) == 0) {
+    if (argc != 3) {
+      display_usage(stdout);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    optind++;
+    cmd_input.traffic_control_command_file = argv[optind++];
+    *operation = TRAFFIC_CONTROL_READ_STATE;
+    return 0;
   }
 
-  int ret = set_user(argv[optind]);
-  if (ret != 0) {
-    return ret;
+  if (strcmp("--tc-read-stats", argv[1]) == 0) {
+    if (argc != 3) {
+      display_usage(stdout);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    optind++;
+    cmd_input.traffic_control_command_file = argv[optind++];
+    *operation = TRAFFIC_CONTROL_READ_STATS;
+    return 0;
   }
 
-  // this string is used for building pathnames, the
-  // process management is done based on the 'user_detail'
-  // global, which was set by 'set_user()' above
-  optind = optind + 1;
-  char *yarn_user_name = argv[optind];
-  if (yarn_user_name == NULL) {
-    fprintf(ERRORFILE, "Invalid yarn user name.\n");
-    return INVALID_USER_NAME;
+  /* Now we have to validate 'run as user' operations that don't use
+    a 'long option' - we should fix this at some point. The validation/argument
+    parsing here is extensive enough that it done in a separate function */
+
+  return validate_run_as_user_commands(argc, argv, operation);
+}
+
+/* Parse/validate 'run as user' commands */
+static int validate_run_as_user_commands(int argc, char **argv, int *operation) {
+  /* We need at least the following arguments in order to proceed further :
+    <user>, <yarn-user> <command> - i.e at argc should be at least 4 */
+
+  if (argc < 4) {
+    display_usage(stdout);
+    return INVALID_ARGUMENT_NUMBER;
   }
- 
-  optind = optind + 1;
-  command = atoi(argv[optind++]);
 
-  fprintf(LOGFILE, "main : command provided %d\n",command);
-  fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
-  fprintf(LOGFILE, "main : requested yarn user is %s\n", yarn_user_name);
+  cmd_input.run_as_user_name = argv[optind++];
+  cmd_input.yarn_user_name = argv[optind++];
+  int command = atoi(argv[optind++]);
+
+  fprintf(LOGFILE, "main : command provided %d\n", command);
+  fprintf(LOGFILE, "main : run as user is %s\n", cmd_input.run_as_user_name);
+  fprintf(LOGFILE, "main : requested yarn user is %s\n", cmd_input.yarn_user_name);
   fflush(LOGFILE);
 
   switch (command) {
   case INITIALIZE_CONTAINER:
     if (argc < 9) {
       fprintf(ERRORFILE, "Too few arguments (%d vs 9) for initialize container\n",
-	      argc);
+       argc);
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     }
-    app_id = argv[optind++];
-    cred_file = argv[optind++];
-    local_dirs = argv[optind++];// good local dirs as a comma separated list
-    log_dirs = argv[optind++];// good log dirs as a comma separated list
-    exit_code = initialize_app(yarn_user_name, app_id, cred_file,
-                               extract_values(local_dirs),
-                               extract_values(log_dirs), argv + optind);
-    break;
+    cmd_input.app_id = argv[optind++];
+    cmd_input.cred_file = argv[optind++];
+    cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
+    cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
+
+    *operation = RUN_AS_USER_INITIALIZE_CONTAINER;
+    return 0;
+
   case LAUNCH_CONTAINER:
-    if (argc != 13) {
-      fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13) for launch container\n",
-	      argc);
+    //kill me now.
+    if (!(argc == 13 || argc == 14)) {
+      fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13 or 14) for launch container\n",
+       argc);
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     }
-    app_id = argv[optind++];
-    container_id = argv[optind++];
-    current_dir = argv[optind++];
-    script_file = argv[optind++];
-    cred_file = argv[optind++];
-    pid_file = argv[optind++];
-    local_dirs = argv[optind++];// good local dirs as a comma separated list
-    log_dirs = argv[optind++];// good log dirs as a comma separated list
-    resources = argv[optind++];// key,value pair describing resources
-    char *resources_key = malloc(strlen(resources));
-    char *resources_value = malloc(strlen(resources));
+
+    cmd_input.app_id = argv[optind++];
+    cmd_input.container_id = argv[optind++];
+    cmd_input.current_dir = argv[optind++];
+    cmd_input.script_file = argv[optind++];
+    cmd_input.cred_file = argv[optind++];
+    cmd_input.pid_file = argv[optind++];
+    cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
+    cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
+    char * resources = argv[optind++];// key,value pair describing resources
+    char * resources_key = malloc(strlen(resources));
+    char * resources_value = malloc(strlen(resources));
+
     if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
         get_kv_value(resources, resources_value, strlen(resources)) < 0) {
         fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s",
@@ -240,51 +299,157 @@ int main(int argc, char **argv) {
         free(resources_value);
         return INVALID_ARGUMENT_NUMBER;
     }
-    char** resources_values = extract_values(resources_value);
-    exit_code = launch_container_as_user(yarn_user_name, app_id,
-                    container_id, current_dir, script_file, cred_file,
-                    pid_file, extract_values(local_dirs),
-                    extract_values(log_dirs), resources_key,
-                    resources_values);
-    free(resources_key);
-    free(resources_value);
-    break;
+
+    //network isolation through tc
+    if (argc == 14) {
+      cmd_input.traffic_control_command_file = argv[optind++];
+    }
+
+    cmd_input.resources_key = resources_key;
+    cmd_input.resources_value = resources_value;
+    cmd_input.resources_values = extract_values(resources_value);
+    *operation = RUN_AS_USER_LAUNCH_CONTAINER;
+    return 0;
+
   case SIGNAL_CONTAINER:
     if (argc != 6) {
       fprintf(ERRORFILE, "Wrong number of arguments (%d vs 6) for " \
           "signal container\n", argc);
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
-    } else {
-      char* end_ptr = NULL;
-      char* option = argv[optind++];
-      int container_pid = strtol(option, &end_ptr, 10);
-      if (option == end_ptr || *end_ptr != '\0') {
-        fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
-        fflush(ERRORFILE);
-        return INVALID_ARGUMENT_NUMBER;
-      }
-      option = argv[optind++];
-      int signal = strtol(option, &end_ptr, 10);
-      if (option == end_ptr || *end_ptr != '\0') {
-        fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
-        fflush(ERRORFILE);
-        return INVALID_ARGUMENT_NUMBER;
-      }
-      exit_code = signal_container_as_user(yarn_user_name, container_pid, signal);
     }
-    break;
+
+    char* end_ptr = NULL;
+    char* option = argv[optind++];
+    cmd_input.container_pid = strtol(option, &end_ptr, 10);
+    if (option == end_ptr || *end_ptr != '\0') {
+      fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    option = argv[optind++];
+    cmd_input.signal = strtol(option, &end_ptr, 10);
+    if (option == end_ptr || *end_ptr != '\0') {
+      fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+
+    *operation = RUN_AS_USER_SIGNAL_CONTAINER;
+    return 0;
+
   case DELETE_AS_USER:
-    dir_to_be_deleted = argv[optind++];
-    exit_code= delete_as_user(yarn_user_name, dir_to_be_deleted,
-                              argv + optind);
-    break;
+    cmd_input.dir_to_be_deleted = argv[optind++];
+    *operation = RUN_AS_USER_DELETE;
+    return 0;
   default:
     fprintf(ERRORFILE, "Invalid command %d not supported.",command);
     fflush(ERRORFILE);
-    exit_code = INVALID_COMMAND_PROVIDED;
+    return INVALID_COMMAND_PROVIDED;
   }
-  fclose(LOGFILE);
-  fclose(ERRORFILE);
+}
+
+int main(int argc, char **argv) {
+  open_log_files();
+  assert_valid_setup(argv[0]);
+
+  int operation;
+  int ret = validate_arguments(argc, argv, &operation);
+
+  if (ret != 0) {
+    flush_and_close_log_files();
+    return ret;
+  }
+
+  int exit_code = 0;
+
+  switch (operation) {
+  case CHECK_SETUP:
+    //we already did this
+    exit_code = 0;
+    break;
+  case MOUNT_CGROUPS:
+    exit_code = 0;
+
+    while (optind < argc && exit_code == 0) {
+      exit_code = mount_cgroup(argv[optind++], cmd_input.cgroups_hierarchy);
+    }
+
+    break;
+  case TRAFFIC_CONTROL_MODIFY_STATE:
+    exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
+    break;
+  case TRAFFIC_CONTROL_READ_STATE:
+    exit_code = traffic_control_read_state(cmd_input.traffic_control_command_file);
+    break;
+  case TRAFFIC_CONTROL_READ_STATS:
+    exit_code = traffic_control_read_stats(cmd_input.traffic_control_command_file);
+    break;
+  case RUN_AS_USER_INITIALIZE_CONTAINER:
+    exit_code = set_user(cmd_input.run_as_user_name);
+    if (exit_code != 0) {
+      break;
+    }
+
+    exit_code = initialize_app(cmd_input.yarn_user_name,
+                            cmd_input.app_id,
+                            cmd_input.cred_file,
+                            extract_values(cmd_input.local_dirs),
+                            extract_values(cmd_input.log_dirs),
+                            argv + optind);
+    break;
+  case RUN_AS_USER_LAUNCH_CONTAINER:
+    if (cmd_input.traffic_control_command_file != NULL) {
+      //apply tc rules before switching users and launching the container
+      exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
+      if( exit_code != 0) {
+        //failed to apply tc rules - break out before launching the container
+        break;
+      }
+    }
+
+    exit_code = set_user(cmd_input.run_as_user_name);
+    if (exit_code != 0) {
+      break;
+    }
+
+    exit_code = launch_container_as_user(cmd_input.yarn_user_name,
+                    cmd_input.app_id,
+                    cmd_input.container_id,
+                    cmd_input.current_dir,
+                    cmd_input.script_file,
+                    cmd_input.cred_file,
+                    cmd_input.pid_file,
+                    extract_values(cmd_input.local_dirs),
+                    extract_values(cmd_input.log_dirs),
+                    cmd_input.resources_key,
+                    cmd_input.resources_values);
+    free(cmd_input.resources_key);
+    free(cmd_input.resources_value);
+    free(cmd_input.resources_values);
+    break;
+  case RUN_AS_USER_SIGNAL_CONTAINER:
+    exit_code = set_user(cmd_input.run_as_user_name);
+    if (exit_code != 0) {
+      break;
+    }
+
+    exit_code = signal_container_as_user(cmd_input.yarn_user_name,
+                                  cmd_input.container_pid,
+                                  cmd_input.signal);
+    break;
+  case RUN_AS_USER_DELETE:
+    exit_code = set_user(cmd_input.run_as_user_name);
+    if (exit_code != 0) {
+      break;
+    }
+
+    exit_code = delete_as_user(cmd_input.yarn_user_name,
+                        cmd_input.dir_to_be_deleted,
+                        argv + optind);
+    break;
+  }
+
+  flush_and_close_log_files();
   return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8e17c58/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
index f837bbc..da47ddc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -32,13 +33,13 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -61,57 +62,88 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
 import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
- * This is intended to test the LinuxContainerExecutor code, but because of
- * some security restrictions this can only be done with some special setup
- * first.
- * <br><ol>
+ * This is intended to test the LinuxContainerExecutor code, but because of some
+ * security restrictions this can only be done with some special setup first. <br>
+ * <ol>
  * <li>Compile the code with container-executor.conf.dir set to the location you
- * want for testing.
- * <br><pre><code>
+ * want for testing. <br>
+ * 
+ * <pre>
+ * <code>
  * > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
  *                          -DskipTests
- * </code></pre>
+ * </code>
+ * </pre>
  * 
  * <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
  * container-executor.cfg needs to be owned by root and have in it the proper
- * config values.
- * <br><pre><code>
+ * config values. <br>
+ * 
+ * <pre>
+ * <code>
  * > cat /etc/hadoop/container-executor.cfg
  * yarn.nodemanager.linux-container-executor.group=mapred
  * #depending on the user id of the application.submitter option
  * min.user.id=1
  * > sudo chown root:root /etc/hadoop/container-executor.cfg
  * > sudo chmod 444 /etc/hadoop/container-executor.cfg
- * </code></pre>
+ * </code>
+ * </pre>
  * 
- * <li>Move the binary and set proper permissions on it. It needs to be owned 
- * by root, the group needs to be the group configured in container-executor.cfg, 
+ * <li>Move the binary and set proper permissions on it. It needs to be owned by
+ * root, the group needs to be the group configured in container-executor.cfg,
  * and it needs the setuid bit set. (The build will also overwrite it so you
- * need to move it to a place that you can support it. 
- * <br><pre><code>
+ * need to move it to a place that you can support it. <br>
+ * 
+ * <pre>
+ * <code>
  * > cp ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
  * > sudo chown root:mapred /tmp/container-executor
- * > sudo chmod 4550 /tmp/container-executor
- * </code></pre>
+ * > sudo chmod 4050 /tmp/container-executor
+ * </code>
+ * </pre>
  * 
  * <li>Run the tests with the execution enabled (The user you run the tests as
- * needs to be part of the group from the config.
- * <br><pre><code>
+ * needs to be part of the group from the config. <br>
+ * 
+ * <pre>
+ * <code>
  * mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
- * </code></pre>
+ * </code>
+ * </pre>
+ *
+ * <li>The test suite also contains tests to test mounting of CGroups. By
+ * default, these tests are not run. To run them, add -Dcgroups.mount=<mount-point>
+ * Please note that the test does not unmount the CGroups at the end of the test,
+ * since that requires root permissions. <br>
+ *
+ * <li>The tests that are run are sensitive to directory permissions. All parent
+ * directories must be searchable by the user that the tasks are run as. If you
+ * wish to run the tests in a different directory, please set it using
+ * -Dworkspace.dir
+ * 
  * </ol>
  */
 public class TestLinuxContainerExecutor {
   private static final Log LOG = LogFactory
-      .getLog(TestLinuxContainerExecutor.class);
-  
-  private static File workSpace = new File("target",
-      TestLinuxContainerExecutor.class.getName() + "-workSpace");
-  
+    .getLog(TestLinuxContainerExecutor.class);
+
+  private static File workSpace;
+  static {
+    String basedir = System.getProperty("workspace.dir");
+    if(basedir == null || basedir.isEmpty()) {
+      basedir = "target";
+    }
+    workSpace = new File(basedir,
+        TestLinuxContainerExecutor.class.getName() + "-workSpace");
+  }
+
   private LinuxContainerExecutor exec = null;
   private String appSubmitter = null;
   private LocalDirsHandlerService dirsHandler;
@@ -125,20 +157,26 @@ public class TestLinuxContainerExecutor {
     files.mkdir(workSpacePath, null, true);
     FileUtil.chmod(workSpace.getAbsolutePath(), "777");
     File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
-    files.mkdir(new Path(localDir.getAbsolutePath()),
-        new FsPermission("777"), false);
+    files.mkdir(new Path(localDir.getAbsolutePath()), new FsPermission("777"),
+      false);
     File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
-    files.mkdir(new Path(logDir.getAbsolutePath()),
-        new FsPermission("777"), false);
+    files.mkdir(new Path(logDir.getAbsolutePath()), new FsPermission("777"),
+      false);
     String exec_path = System.getProperty("container-executor.path");
-    if(exec_path != null && !exec_path.isEmpty()) {
+    if (exec_path != null && !exec_path.isEmpty()) {
       conf = new Configuration(false);
       conf.setClass("fs.AbstractFileSystem.file.impl",
         org.apache.hadoop.fs.local.LocalFs.class,
         org.apache.hadoop.fs.AbstractFileSystem.class);
-      conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "xuan");
-      LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
-          +"="+exec_path);
+
+      appSubmitter = System.getProperty("application.submitter");
+      if (appSubmitter == null || appSubmitter.isEmpty()) {
+        appSubmitter = "nobody";
+      }
+
+      conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, appSubmitter);
+      LOG.info("Setting " + YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+          + "=" + exec_path);
       conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
       exec = new LinuxContainerExecutor();
       exec.setConf(conf);
@@ -146,34 +184,86 @@ public class TestLinuxContainerExecutor {
       conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
       dirsHandler = new LocalDirsHandlerService();
       dirsHandler.init(conf);
+      List<String> localDirs = dirsHandler.getLocalDirs();
+      for (String dir : localDirs) {
+        Path userDir = new Path(dir, ContainerLocalizer.USERCACHE);
+        files.mkdir(userDir, new FsPermission("777"), false);
+        // $local/filecache
+        Path fileDir = new Path(dir, ContainerLocalizer.FILECACHE);
+        files.mkdir(fileDir, new FsPermission("777"), false);
+      }
     }
-    appSubmitter = System.getProperty("application.submitter");
-    if(appSubmitter == null || appSubmitter.isEmpty()) {
-      appSubmitter = "nobody";
-    }
+
   }
 
   @After
   public void tearDown() throws Exception {
     FileContext.getLocalFSFileContext().delete(
-        new Path(workSpace.getAbsolutePath()), true);
+      new Path(workSpace.getAbsolutePath()), true);
+  }
+
+  private void cleanupUserAppCache(String user) throws Exception {
+    List<String> localDirs = dirsHandler.getLocalDirs();
+    for (String dir : localDirs) {
+      Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE);
+      Path userdir = new Path(usercachedir, user);
+      Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE);
+      exec.deleteAsUser(user, appcachedir);
+      FileContext.getLocalFSFileContext().delete(usercachedir, true);
+    }
+  }
+
+  private void cleanupUserFileCache(String user) {
+    List<String> localDirs = dirsHandler.getLocalDirs();
+    for (String dir : localDirs) {
+      Path filecache = new Path(dir, ContainerLocalizer.FILECACHE);
+      Path filedir = new Path(filecache, user);
+      exec.deleteAsUser(user, filedir);
+    }
+  }
+
+  private void cleanupLogDirs(String user) {
+    List<String> logDirs = dirsHandler.getLogDirs();
+    for (String dir : logDirs) {
+      String appId = "APP_" + id;
+      String containerId = "CONTAINER_" + (id - 1);
+      Path appdir = new Path(dir, appId);
+      Path containerdir = new Path(appdir, containerId);
+      exec.deleteAsUser(user, containerdir);
+    }
+  }
+
+  private void cleanupAppFiles(String user) throws Exception {
+    cleanupUserAppCache(user);
+    cleanupUserFileCache(user);
+    cleanupLogDirs(user);
+
+    String[] files =
+        { "launch_container.sh", "container_tokens", "touch-file" };
+    Path ws = new Path(workSpace.toURI());
+    for (String file : files) {
+      File f = new File(workSpace, file);
+      if (f.exists()) {
+        exec.deleteAsUser(user, new Path(file), ws);
+      }
+    }
   }
 
   private boolean shouldRun() {
-    if(exec == null) {
+    if (exec == null) {
       LOG.warn("Not running test because container-executor.path is not set");
       return false;
     }
     return true;
   }
-  
-  private String writeScriptFile(String ... cmd) throws IOException {
+
+  private String writeScriptFile(String... cmd) throws IOException {
     File f = File.createTempFile("TestLinuxContainerExecutor", ".sh");
     f.deleteOnExit();
     PrintWriter p = new PrintWriter(new FileOutputStream(f));
     p.println("#!/bin/sh");
     p.print("exec");
-    for(String part: cmd) {
+    for (String part : cmd) {
       p.print(" '");
       p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
       p.print("'");
@@ -182,36 +272,36 @@ public class TestLinuxContainerExecutor {
     p.close();
     return f.getAbsolutePath();
   }
-  
+
   private int id = 0;
+
   private synchronized int getNextId() {
     id += 1;
     return id;
   }
-  
+
   private ContainerId getNextContainerId() {
     ContainerId cId = mock(ContainerId.class);
-    String id = "CONTAINER_"+getNextId();
+    String id = "CONTAINER_" + getNextId();
     when(cId.toString()).thenReturn(id);
     return cId;
   }
-  
 
-  private int runAndBlock(String ... cmd) throws IOException {
+  private int runAndBlock(String... cmd) throws IOException {
     return runAndBlock(getNextContainerId(), cmd);
   }
-  
-  private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
-    String appId = "APP_"+getNextId();
+
+  private int runAndBlock(ContainerId cId, String... cmd) throws IOException {
+    String appId = "APP_" + getNextId();
     Container container = mock(Container.class);
     ContainerLaunchContext context = mock(ContainerLaunchContext.class);
-    HashMap<String, String> env = new HashMap<String,String>();
+    HashMap<String, String> env = new HashMap<String, String>();
 
     when(container.getContainerId()).thenReturn(cId);
     when(container.getLaunchContext()).thenReturn(context);
 
     when(context.getEnvironment()).thenReturn(env);
-    
+
     String script = writeScriptFile(cmd);
 
     Path scriptPath = new Path(script);
@@ -221,46 +311,36 @@ public class TestLinuxContainerExecutor {
 
     exec.activateContainer(cId, pidFile);
     return exec.launchContainer(container, scriptPath, tokensPath,
-        appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
-        dirsHandler.getLogDirs());
+      appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
+      dirsHandler.getLogDirs());
   }
-  
+
   @Test
   public void testContainerLocalizer() throws Exception {
-    if (!shouldRun()) {
-      return;
-    }
-    List<String> localDirs = dirsHandler.getLocalDirs();
-    List<String> logDirs = dirsHandler.getLogDirs();
-    for (String localDir : localDirs) {
-      Path userDir =
-          new Path(localDir, ContainerLocalizer.USERCACHE);
-      files.mkdir(userDir, new FsPermission("777"), false);
-      // $local/filecache
-      Path fileDir =
-          new Path(localDir, ContainerLocalizer.FILECACHE);
-      files.mkdir(fileDir, new FsPermission("777"), false);
-    }
+
+    Assume.assumeTrue(shouldRun());
+
     String locId = "container_01_01";
     Path nmPrivateContainerTokensPath =
-        dirsHandler.getLocalPathForWrite(
-            ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
-              + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
-                  locId));
+        dirsHandler
+          .getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
+              + Path.SEPARATOR
+              + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId));
     files.create(nmPrivateContainerTokensPath, EnumSet.of(CREATE, OVERWRITE));
     Configuration config = new YarnConfiguration(conf);
-    InetSocketAddress nmAddr = config.getSocketAddr(
-      YarnConfiguration.NM_BIND_HOST,
-      YarnConfiguration.NM_LOCALIZER_ADDRESS,
-      YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
-      YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+    InetSocketAddress nmAddr =
+        config.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
+          YarnConfiguration.NM_LOCALIZER_ADDRESS,
+          YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
+          YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
     String appId = "application_01_01";
     exec = new LinuxContainerExecutor() {
       @Override
-      public void buildMainArgs(List<String> command, String user, String appId,
-          String locId, InetSocketAddress nmAddr, List<String> localDirs) {
-        MockContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
-          localDirs);
+      public void buildMainArgs(List<String> command, String user,
+          String appId, String locId, InetSocketAddress nmAddr,
+          List<String> localDirs) {
+        MockContainerLocalizer.buildMainArgs(command, user, appId, locId,
+          nmAddr, localDirs);
       }
     };
     exec.setConf(conf);
@@ -277,44 +357,68 @@ public class TestLinuxContainerExecutor {
     files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
     exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter,
       appId, locId2, dirsHandler);
+    cleanupUserAppCache(appSubmitter);
   }
-  
+
   @Test
-  public void testContainerLaunch() throws IOException {
-    if (!shouldRun()) {
-      return;
-    }
+  public void testContainerLaunch() throws Exception {
+    Assume.assumeTrue(shouldRun());
+    String expectedRunAsUser =
+        conf.get(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
+          YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
+
+    File touchFile = new File(workSpace, "touch-file");
+    int ret = runAndBlock("touch", touchFile.getAbsolutePath());
+
+    assertEquals(0, ret);
+    FileStatus fileStatus =
+        FileContext.getLocalFSFileContext().getFileStatus(
+          new Path(touchFile.getAbsolutePath()));
+    assertEquals(expectedRunAsUser, fileStatus.getOwner());
+    cleanupAppFiles(expectedRunAsUser);
 
+  }
+
+  @Test
+  public void testNonSecureRunAsSubmitter() throws Exception {
+    Assume.assumeTrue(shouldRun());
+    Assume.assumeFalse(UserGroupInformation.isSecurityEnabled());
+    String expectedRunAsUser = appSubmitter;
+    conf.set(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, "false");
+    exec.setConf(conf);
     File touchFile = new File(workSpace, "touch-file");
     int ret = runAndBlock("touch", touchFile.getAbsolutePath());
-    
+
     assertEquals(0, ret);
-    FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
+    FileStatus fileStatus =
+        FileContext.getLocalFSFileContext().getFileStatus(
           new Path(touchFile.getAbsolutePath()));
-    assertEquals(appSubmitter, fileStatus.getOwner());
+    assertEquals(expectedRunAsUser, fileStatus.getOwner());
+    cleanupAppFiles(expectedRunAsUser);
+    // reset conf
+    conf.unset(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS);
+    exec.setConf(conf);
   }
 
   @Test
   public void testContainerKill() throws Exception {
-    if (!shouldRun()) {
-      return;
-    }
-    
-    final ContainerId sleepId = getNextContainerId();   
+    Assume.assumeTrue(shouldRun());
+
+    final ContainerId sleepId = getNextContainerId();
     Thread t = new Thread() {
       public void run() {
         try {
           runAndBlock(sleepId, "sleep", "100");
         } catch (IOException e) {
-          LOG.warn("Caught exception while running sleep",e);
+          LOG.warn("Caught exception while running sleep", e);
         }
       };
     };
-    t.setDaemon(true); //If it does not exit we shouldn't block the test.
+    t.setDaemon(true); // If it does not exit we shouldn't block the test.
     t.start();
 
     assertTrue(t.isAlive());
-   
+
     String pid = null;
     int count = 10;
     while ((pid = exec.getProcessId(sleepId)) == null && count > 0) {
@@ -328,40 +432,77 @@ public class TestLinuxContainerExecutor {
     exec.signalContainer(appSubmitter, pid, Signal.TERM);
     LOG.info("sleeping for 100ms to let the sleep be killed");
     Thread.sleep(100);
-    
+
     assertFalse(t.isAlive());
+    cleanupAppFiles(appSubmitter);
+  }
+
+  @Test
+  public void testCGroups() throws Exception {
+    Assume.assumeTrue(shouldRun());
+    String cgroupsMount = System.getProperty("cgroups.mount");
+    Assume.assumeTrue((cgroupsMount != null) && !cgroupsMount.isEmpty());
+
+    assertTrue("Cgroups mount point does not exist", new File(
+        cgroupsMount).exists());
+    List<String> cgroupKVs = new ArrayList<>();
+
+    String hierarchy = "hadoop-yarn";
+    String[] controllers = { "cpu", "net_cls" };
+    for (String controller : controllers) {
+      cgroupKVs.add(controller + "=" + cgroupsMount + "/" + controller);
+      assertTrue(new File(cgroupsMount, controller).exists());
+    }
+
+    try {
+      exec.mountCgroups(cgroupKVs, hierarchy);
+      for (String controller : controllers) {
+        assertTrue(controller + " cgroup not mounted", new File(
+            cgroupsMount + "/" + controller + "/tasks").exists());
+        assertTrue(controller + " cgroup hierarchy not created",
+            new File(cgroupsMount + "/" + controller + "/" + hierarchy).exists());
+        assertTrue(controller + " cgroup hierarchy created incorrectly",
+            new File(cgroupsMount + "/" + controller + "/" + hierarchy
+                + "/tasks").exists());
+      }
+    } catch (IOException ie) {
+      fail("Couldn't mount cgroups " + ie.toString());
+      throw ie;
+    }
   }
 
   @Test
   public void testLocalUser() throws Exception {
+    Assume.assumeTrue(shouldRun());
     try {
-      //nonsecure default
+      // nonsecure default
       Configuration conf = new YarnConfiguration();
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-          "simple");
+        "simple");
       UserGroupInformation.setConfiguration(conf);
       LinuxContainerExecutor lce = new LinuxContainerExecutor();
       lce.setConf(conf);
-      Assert.assertEquals(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
+      Assert.assertEquals(
+          YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
           lce.getRunAsUser("foo"));
 
-      //nonsecure custom setting
+      // nonsecure custom setting
       conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "bar");
       lce = new LinuxContainerExecutor();
       lce.setConf(conf);
       Assert.assertEquals("bar", lce.getRunAsUser("foo"));
 
-      //nonsecure without limits
+      // nonsecure without limits
       conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "bar");
       conf.set(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, "false");
       lce = new LinuxContainerExecutor();
       lce.setConf(conf);
       Assert.assertEquals("foo", lce.getRunAsUser("foo"));
 
-      //secure
+      // secure
       conf = new YarnConfiguration();
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-          "kerberos");
+        "kerberos");
       UserGroupInformation.setConfiguration(conf);
       lce = new LinuxContainerExecutor();
       lce.setConf(conf);
@@ -369,49 +510,50 @@ public class TestLinuxContainerExecutor {
     } finally {
       Configuration conf = new YarnConfiguration();
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-          "simple");
+        "simple");
       UserGroupInformation.setConfiguration(conf);
     }
   }
 
   @Test
   public void testNonsecureUsernamePattern() throws Exception {
+    Assume.assumeTrue(shouldRun());
     try {
-      //nonsecure default
+      // nonsecure default
       Configuration conf = new YarnConfiguration();
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-          "simple");
+        "simple");
       UserGroupInformation.setConfiguration(conf);
       LinuxContainerExecutor lce = new LinuxContainerExecutor();
       lce.setConf(conf);
       lce.verifyUsernamePattern("foo");
       try {
         lce.verifyUsernamePattern("foo/x");
-        Assert.fail();
+        fail();
       } catch (IllegalArgumentException ex) {
-        //NOP        
+        // NOP
       } catch (Throwable ex) {
-        Assert.fail(ex.toString());
+        fail(ex.toString());
       }
-      
-      //nonsecure custom setting
+
+      // nonsecure custom setting
       conf.set(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY, "foo");
       lce = new LinuxContainerExecutor();
       lce.setConf(conf);
       lce.verifyUsernamePattern("foo");
       try {
         lce.verifyUsernamePattern("bar");
-        Assert.fail();
+        fail();
       } catch (IllegalArgumentException ex) {
-        //NOP        
+        // NOP
       } catch (Throwable ex) {
-        Assert.fail(ex.toString());
+        fail(ex.toString());
       }
 
-      //secure, pattern matching does not kick in.
+      // secure, pattern matching does not kick in.
       conf = new YarnConfiguration();
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-          "kerberos");
+        "kerberos");
       UserGroupInformation.setConfiguration(conf);
       lce = new LinuxContainerExecutor();
       lce.setConf(conf);
@@ -420,13 +562,14 @@ public class TestLinuxContainerExecutor {
     } finally {
       Configuration conf = new YarnConfiguration();
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-          "simple");
+        "simple");
       UserGroupInformation.setConfiguration(conf);
     }
   }
 
-  @Test(timeout=10000)
+  @Test(timeout = 10000)
   public void testPostExecuteAfterReacquisition() throws Exception {
+    Assume.assumeTrue(shouldRun());
     // make up some bogus container ID
     ApplicationId appId = ApplicationId.newInstance(12345, 67890);
     ApplicationAttemptId attemptId =
@@ -435,7 +578,7 @@ public class TestLinuxContainerExecutor {
 
     Configuration conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
-        TestResourceHandler.class, LCEResourcesHandler.class);
+      TestResourceHandler.class, LCEResourcesHandler.class);
     LinuxContainerExecutor lce = new LinuxContainerExecutor();
     lce.setConf(conf);
     try {
@@ -444,7 +587,7 @@ public class TestLinuxContainerExecutor {
       // expected if LCE isn't setup right, but not necessary for this test
     }
     lce.reacquireContainer("foouser", cid);
-    Assert.assertTrue("postExec not called after reacquisition",
+    assertTrue("postExec not called after reacquisition",
         TestResourceHandler.postExecContainers.contains(cid));
   }
 


Mime
View raw message