hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077679 [2/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/core/org/apache/hadoop/fs/ src/core/org/apa...
Date Fri, 04 Mar 2011 04:43:35 GMT
Added: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1077679&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/test/test-task-controller.c (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/test/test-task-controller.c Fri Mar  4 04:43:33 2011
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "task-controller.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-task-controller"
+#define DONT_TOUCH_FILE "dont-touch-me"
+
+static char* username = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork - %s\n", strerror(errno));
+  } else if (child == 0) {
+    char *cmd_copy = strdup(cmd);
+    char *ptr;
+    int words = 1;
+    for(ptr = strchr(cmd_copy, ' ');  ptr; ptr = strchr(ptr+1, ' ')) {
+      words += 1;
+    }
+    char **argv = malloc(sizeof(char *) * (words + 1));
+    ptr = strtok(cmd_copy, " ");
+    int i = 0;
+    argv[i++] = ptr;
+    while (ptr != NULL) {
+      ptr = strtok(NULL, " ");
+      argv[i++] = ptr;
+    }
+    if (execvp(argv[0], argv) != 0) {
+      printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+      exit(42);
+    }
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) <= 0) {
+      printf("FAIL: failed waiting for child process %s pid %d - %s\n", 
+	     cmd, child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: process %s pid %d exited with error status %d\n", cmd, 
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fprintf(file, "mapred.local.dir=" TEST_ROOT "/local-1");
+  int i;
+  for(i=2; i < 5; ++i) {
+    fprintf(file, "," TEST_ROOT "/local-%d", i);
+  }
+  fprintf(file, "\n");
+  fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
+  fclose(file);
+  return 0;
+}
+
+void create_tt_roots() {
+  char** tt_roots = get_values("mapred.local.dir");
+  char** tt_root;
+  for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+    if (mkdir(*tt_root, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", *tt_root,
+	     strerror(errno));
+      exit(1);
+    }
+    char buffer[100000];
+    sprintf(buffer, "%s/taskTracker", *tt_root);
+    if (mkdir(buffer, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", buffer,
+	     strerror(errno));
+      exit(1);
+    }
+  }
+  free_values(tt_roots);
+}
+
+void test_get_user_directory() {
+  char *user_dir = get_user_directory("/tmp", "user");
+  char *expected = "/tmp/taskTracker/user";
+  if (strcmp(user_dir, expected) != 0) {
+    printf("test_get_user_directory expected %s got %s\n", user_dir, expected);
+    exit(1);
+  }
+  free(user_dir);
+}
+
+void test_get_job_directory() {
+  char *expected = "/tmp/taskTracker/user/jobcache/job_200906101234_0001";
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  if (strcmp(job_dir, expected) != 0) {
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_get_attempt_directory() {
+  char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1",
+						 "attempt_1");
+  char *expected = "/tmp/taskTracker/owen/jobcache/job_1/attempt_1/work";
+  if (strcmp(attempt_dir, expected) != 0) {
+    printf("Fail get_attempt_work_directory got %s expected %s\n",
+	   attempt_dir, expected);
+  }
+  free(attempt_dir);
+}
+
+void test_get_task_launcher_file() {
+  char *expected_file = ("/tmp/taskTracker/user/jobcache/job_200906101234_0001"
+			 "/taskjvm.sh");
+  char *job_dir = get_job_directory("/tmp", "user",
+                                    "job_200906101234_0001");
+  char *task_file =  get_task_launcher_file(job_dir);
+  if (strcmp(task_file, expected_file) != 0) {
+    printf("failure to match expected task file %s vs %s\n", task_file,
+           expected_file);
+    exit(1);
+  }
+  free(job_dir);
+  free(task_file);
+}
+
+void test_get_job_log_dir() {
+  char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001";
+  char *logdir = get_job_log_directory("job_200906101234_0001");
+  if (strcmp(logdir, expected) != 0) {
+    printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected);
+    exit(1);
+  }
+  free(logdir);
+}
+
+void test_get_task_log_dir() {
+  char *logdir = get_job_log_directory("job_5/task_4");
+  char *expected = TEST_ROOT "/logs/userlogs/job_5/task_4";
+  if (strcmp(logdir, expected) != 0) {
+    printf("FAIL: get_task_log_dir expected %s got %s\n", logdir, expected);
+  }
+  free(logdir);
+}
+
+void test_check_user() {
+  printf("\nTesting test_check_user\n");
+  struct passwd *user = check_user(username);
+  if (user == NULL) {
+    printf("FAIL: failed check for user %s\n", username);
+    exit(1);
+  }
+  free(user);
+  if (check_user("lp") != NULL) {
+    printf("FAIL: failed check for system user lp\n");
+    exit(1);
+  }
+  if (check_user("root") != NULL) {
+    printf("FAIL: failed check for system user root\n");
+    exit(1);
+  }
+  if (check_user("mapred") != NULL) {
+    printf("FAIL: failed check for hadoop user mapred\n");
+    exit(1);
+  }
+}
+
+void test_check_configuration_permissions() {
+  printf("\nTesting check_configuration_permissions\n");
+  if (check_configuration_permissions("/etc/passwd") != 0) {
+    printf("FAIL: failed permission check on /etc/passwd\n");
+    exit(1);
+  }
+  if (check_configuration_permissions(TEST_ROOT) == 0) {
+    printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+    exit(1);
+  }
+}
+
+void test_delete_task() {
+  if (initialize_user(username)) {
+    printf("FAIL: failed to initialized user %s\n", username);
+    exit(1);
+  }
+  char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
+  char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
+                                       DONT_TOUCH_FILE);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", 
+					      username, "job_1", "task_1");
+  char buffer[100000];
+  sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+  run(buffer);
+  sprintf(buffer, "touch %s", dont_touch);
+  run(buffer);
+
+  // soft link to the canary file from the task directory
+  sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+  run(buffer);
+  // hard link to the canary file from the task directory
+  sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+  run(buffer);
+  // create a dot file in the task directory
+  sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+  run(buffer);
+  // create a no permission file
+  sprintf(buffer, "touch %s/who/let/protect", task_dir);
+  run(buffer);
+  sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+  run(buffer);
+  // create a no permission directory
+  sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+  run(buffer);
+
+  // delete task directory
+  int ret = delete_as_user(username, "jobcache/job_1/task_1");
+  if (ret != 0) {
+    printf("FAIL: return code from delete_as_user is %d\n", ret);
+    exit(1);
+  }
+
+  // check to make sure the task directory is gone
+  if (access(task_dir, R_OK) == 0) {
+    printf("FAIL: failed to delete the directory - %s\n", task_dir);
+    exit(1);
+  }
+  // check to make sure the job directory is not gone
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: accidently deleted the directory - %s\n", job_dir);
+    exit(1);
+  }
+  // but that the canary is not gone
+  if (access(dont_touch, R_OK) != 0) {
+    printf("FAIL: accidently deleted file %s\n", dont_touch);
+    exit(1);
+  }
+  sprintf(buffer, "chmod -R 700 %s", job_dir);
+  run(buffer);
+  sprintf(buffer, "rm -fr %s", job_dir);
+  run(buffer);
+  free(job_dir);
+  free(task_dir);
+  free(dont_touch);
+}
+
+void test_delete_job() {
+  char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
+  char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
+                                       DONT_TOUCH_FILE);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", 
+					      username, "job_2", "task_1");
+  char buffer[100000];
+  sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+  run(buffer);
+  sprintf(buffer, "touch %s", dont_touch);
+  run(buffer);
+
+  // soft link to the canary file from the task directory
+  sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+  run(buffer);
+  // hard link to the canary file from the task directory
+  sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+  run(buffer);
+  // create a dot file in the task directory
+  sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+  run(buffer);
+  // create a no permission file
+  sprintf(buffer, "touch %s/who/let/protect", task_dir);
+  run(buffer);
+  sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+  run(buffer);
+  // create a no permission directory
+  sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+  run(buffer);
+
+  // delete task directory
+  int ret = delete_as_user(username, "jobcache/job_2");
+  if (ret != 0) {
+    printf("FAIL: return code from delete_as_user is %d\n", ret);
+    exit(1);
+  }
+
+  // check to make sure the task directory is gone
+  if (access(task_dir, R_OK) == 0) {
+    printf("FAIL: failed to delete the directory - %s\n", task_dir);
+    exit(1);
+  }
+  // check to make sure the job directory is gone
+  if (access(job_dir, R_OK) == 0) {
+    printf("FAIL: didn't delete the directory - %s\n", job_dir);
+    exit(1);
+  }
+  // but that the canary is not gone
+  if (access(dont_touch, R_OK) != 0) {
+    printf("FAIL: accidently deleted file %s\n", dont_touch);
+    exit(1);
+  }
+  free(job_dir);
+  free(task_dir);
+  free(dont_touch);
+}
+
+
+void test_delete_user() {
+  printf("\nTesting delete_user\n");
+  char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
+  if (mkdirs(job_dir, 0700) != 0) {
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/local-1/taskTracker/%s", TEST_ROOT, username);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: directory missing before test\n");
+    exit(1);
+  }
+  if (delete_as_user(username, "") != 0) {
+    exit(1);
+  }
+  if (access(buffer, R_OK) == 0) {
+    printf("FAIL: directory not deleted\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/local-1", R_OK) != 0) {
+    printf("FAIL: local-1 directory does not exist\n");
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_delete_log_directory() {
+  printf("\nTesting delete_log_directory\n");
+  char *job_log_dir = get_job_log_directory("job_1");
+  if (job_log_dir == NULL) {
+    exit(1);
+  }
+  if (create_directory_for_user(job_log_dir) != 0) {
+    exit(1);
+  }
+  free(job_log_dir);
+  char *task_log_dir = get_job_log_directory("job_1/task_2");
+  if (task_log_dir == NULL) {
+    exit(1);
+  }
+  if (mkdirs(task_log_dir, 0700) != 0) {
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) != 0) {
+    printf("FAIL: can't access task directory - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (delete_log_directory("job_1/task_2") != 0) {
+    printf("FAIL: can't delete task directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) == 0) {
+    printf("FAIL: task directory not deleted\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) != 0) {
+    printf("FAIL: job directory not deleted - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (delete_log_directory("job_1") != 0) {
+    printf("FAIL: can't delete task directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) == 0) {
+    printf("FAIL: job directory not deleted\n");
+    exit(1);
+  }
+  free(task_log_dir);
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+  printf("\nRunning test %s in child process\n", test_name);
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    func();
+    exit(0);
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: child %d didn't exit - %d\n", child, status);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: child %d exited with bad status %d\n",
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+void test_signal_task() {
+  printf("\nTesting signal_task\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  } else {
+    printf("Child task launched as %d\n", child);
+    if (signal_user_task(username, child, SIGQUIT) != 0) {
+      exit(1);
+    }
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid failed - %s\n", strerror(errno));
+      exit(1);
+    }
+    if (!WIFSIGNALED(status)) {
+      printf("FAIL: child wasn't signalled - %d\n", status);
+      exit(1);
+    }
+    if (WTERMSIG(status) != SIGQUIT) {
+      printf("FAIL: child was killed with %d instead of %d\n", 
+	     WTERMSIG(status), SIGQUIT);
+      exit(1);
+    }
+  }
+}
+
+void test_signal_task_group() {
+  printf("\nTesting group signal_task\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    setpgrp();
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  }
+  printf("Child task launched as %d\n", child);
+  if (signal_user_task(username, child, SIGKILL) != 0) {
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) == -1) {
+    printf("FAIL: waitpid failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (!WIFSIGNALED(status)) {
+    printf("FAIL: child wasn't signalled - %d\n", status);
+    exit(1);
+  }
+  if (WTERMSIG(status) != SIGKILL) {
+    printf("FAIL: child was killed with %d instead of %d\n", 
+	   WTERMSIG(status), SIGKILL);
+    exit(1);
+  }
+}
+
+void test_init_job() {
+  printf("\nTesting init job\n");
+  if (seteuid(0) != 0) {
+    printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  FILE* creds = fopen(TEST_ROOT "/creds.txt", "w");
+  if (creds == NULL) {
+    printf("FAIL: failed to create credentials file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(creds, "secret key\n") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(creds) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  FILE* job_xml = fopen(TEST_ROOT "/job.xml", "w");
+  if (job_xml == NULL) {
+    printf("FAIL: failed to create job file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(job_xml, "<jobconf/>\n") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(job_xml) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (seteuid(user_detail->pw_uid) != 0) {
+    printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+    exit(1);
+  }
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork process for init_job - %s\n", 
+	   strerror(errno));
+    exit(1);
+  } else if (child == 0) {
+    char *final_pgm[] = {"touch", "my-touch-file", 0};
+    if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", 
+                       TEST_ROOT "/job.xml", final_pgm) != 0) {
+      printf("FAIL: failed in child\n");
+      exit(42);
+    }
+    // should never return
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) <= 0) {
+    printf("FAIL: failed waiting for process %d - %s\n", child, 
+	   strerror(errno));
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_4", R_OK) != 0) {
+    printf("FAIL: failed to create job log directory\n");
+    exit(1);
+  }
+  char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_4");
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job directory %s\n", job_dir);
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/jobToken", job_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create credentials %s\n", buffer);
+    exit(1);
+  }
+  sprintf(buffer, "%s/my-touch-file", job_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create touch file %s\n", buffer);
+    exit(1);
+  }
+  free(job_dir);
+  job_dir = get_job_log_directory("job_4");
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job log directory %s\n", job_dir);
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_run_task() {
+  printf("\nTesting run task\n");
+  if (seteuid(0) != 0) {
+    printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  const char* script_name = TEST_ROOT "/task-script";
+  FILE* script = fopen(script_name, "w");
+  if (script == NULL) {
+    printf("FAIL: failed to create script file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (seteuid(user_detail->pw_uid) != 0) {
+    printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(script, "#!/bin/bash\n"
+                     "touch foobar\n"
+                     "exit 0") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(script) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  fflush(stdout);
+  fflush(stderr);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1", 
+					      username, "job_4", "task_1");
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork process for init_job - %s\n", 
+	   strerror(errno));
+    exit(1);
+  } else if (child == 0) {
+    if (run_task_as_user(username, "job_4", "task_1", 
+                         task_dir, script_name) != 0) {
+      printf("FAIL: failed in child\n");
+      exit(42);
+    }
+    // should never return
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) <= 0) {
+    printf("FAIL: failed waiting for process %d - %s\n", child, 
+	   strerror(errno));
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_4/task_1", R_OK) != 0) {
+    printf("FAIL: failed to create task log directory\n");
+    exit(1);
+  }
+  if (access(task_dir, R_OK) != 0) {
+    printf("FAIL: failed to create task directory %s\n", task_dir);
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/foobar", task_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create touch file %s\n", buffer);
+    exit(1);
+  }
+  free(task_dir);
+  task_dir = get_job_log_directory("job_4/task_1");
+  if (access(task_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job log directory %s\n", task_dir);
+    exit(1);
+  }
+  free(task_dir);
+}
+
+int main(int argc, char **argv) {
+  LOGFILE = stdout;
+  int my_username = 0;
+
+  // clean up any junk from previous run
+  system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+  
+  if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+    exit(1);
+  }
+  
+  if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+    exit(1);
+  }
+  read_config(TEST_ROOT "/test.cfg");
+
+  create_tt_roots();
+
+  if (getuid() == 0 && argc == 2) {
+    username = argv[1];
+  } else {
+    username = strdup(getpwuid(getuid())->pw_name);
+    my_username = 1;
+  }
+  set_tasktracker_uid(geteuid(), getegid());
+
+  if (set_user(username)) {
+    exit(1);
+  }
+
+  printf("\nStarting tests\n");
+
+  printf("\nTesting get_user_directory()\n");
+  test_get_user_directory();
+
+  printf("\nTesting get_job_directory()\n");
+  test_get_job_directory();
+
+  printf("\nTesting get_attempt_directory()\n");
+  test_get_attempt_directory();
+
+  printf("\nTesting get_task_launcher_file()\n");
+  test_get_task_launcher_file();
+
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
+  test_check_configuration_permissions();
+
+  printf("\nTesting get_task_log_dir()\n");
+  test_get_task_log_dir();
+
+  printf("\nTesting delete_task()\n");
+  test_delete_task();
+
+  printf("\nTesting delete_job()\n");
+  test_delete_job();
+
+  test_delete_user();
+
+  test_check_user();
+
+  test_delete_log_directory();
+
+  // the tests that change user need to be run in a subshell, so that
+  // when they change user they don't give up our privs
+  run_test_in_child("test_signal_task", test_signal_task);
+  run_test_in_child("test_signal_task_group", test_signal_task_group);
+
+  // init job and run task can't be run if you aren't testing as root
+  if (getuid() == 0) {
+    // these tests do internal forks so that the change_owner and execs
+    // don't mess up our process.
+    test_init_job();
+    test_run_task();
+  }
+
+  seteuid(0);
+  run("rm -fr " TEST_ROOT);
+  printf("\nFinished tests\n");
+
+  if (my_username) {
+    free(username);
+  }
+  free_configurations();
+  return 0;
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java Fri Mar  4 04:43:33 2011
@@ -142,6 +142,23 @@ public class LocalDirAllocator {
     return context.getLocalPathToRead(pathStr, conf);
   }
 
+  /**
+   * Get all of the paths that currently exist in the working directories.
+   * @param pathStr the path underneath the roots
+   * @param conf the configuration to look up the roots in
+   * @return all of the paths that exist under any of the roots
+   * @throws IOException
+   */
+  public Iterable<Path> getAllLocalPathsToRead(String pathStr, 
+                                               Configuration conf
+                                               ) throws IOException {
+    AllocatorPerContext context;
+    synchronized (this) {
+      context = obtainContext(contextCfgItemName);
+    }
+    return context.getAllLocalPathsToRead(pathStr, conf);    
+  }
+
   /** Creates a temporary file in the local FS. Pass size as -1 if not known 
    *  apriori. We round-robin over the set of disks (via the configured dirs) 
    *  and select the first complete path which has enough space. A file is
@@ -210,7 +227,8 @@ public class LocalDirAllocator {
     /** This method gets called everytime before any read/write to make sure
      * that any change to localDirs is reflected immediately.
      */
-    private void confChanged(Configuration conf) throws IOException {
+    private synchronized void confChanged(Configuration conf
+                                          ) throws IOException {
       String newLocalDirs = conf.get(contextCfgItemName);
       if (!newLocalDirs.equals(savedLocalDirs)) {
         localDirs = conf.getStrings(contextCfgItemName);
@@ -270,17 +288,6 @@ public class LocalDirAllocator {
       return dirNumLastAccessed;
     }
     
-    /** Get a path from the local FS. This method should be used if the size of 
-     *  the file is not known a priori. 
-     *  
-     *  It will use roulette selection, picking directories
-     *  with probability proportional to their available space. 
-     */
-    public synchronized Path getLocalPathForWrite(String path, 
-        Configuration conf) throws IOException {
-      return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
-    }
-
     /** Get a path from the local FS. If size is known, we go
      *  round-robin over the set of disks (via the configured dirs) and return
      *  the first complete path which has enough space.
@@ -395,6 +402,76 @@ public class LocalDirAllocator {
       " the configured local directories");
     }
 
+    private static 
+    class PathIterator implements Iterator<Path>, Iterable<Path> {
+      private final FileSystem fs;
+      private final String pathStr;
+      private int i = 0;
+      private final String[] rootDirs;
+      private Path next = null;
+
+      private PathIterator(FileSystem fs, String pathStr, String[] rootDirs
+                           ) throws IOException {
+        this.fs = fs;
+        this.pathStr = pathStr;
+        this.rootDirs = rootDirs;
+        advance();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return next != null;
+      }
+
+      private void advance() throws IOException {
+        while (i < rootDirs.length) {
+          next = new Path(rootDirs[i++], pathStr);
+          if (fs.exists(next)) {
+            return;
+          }
+        }
+        next = null;
+      }
+
+      @Override
+      public Path next() {
+        Path result = next;
+        try {
+          advance();
+        } catch (IOException ie) {
+          throw new RuntimeException("Can't check existance of " + next, ie);
+        }
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("read only iterator");
+      }
+
+      @Override
+      public Iterator<Path> iterator() {
+        return this;
+      }
+    }
+
+    /**
+     * Get all of the paths that currently exist in the working directories.
+     * @param pathStr the path underneath the roots
+     * @param conf the configuration to look up the roots in
+     * @return all of the paths that exist under any of the roots
+     * @throws IOException
+     */
+    synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
+                                                       Configuration conf
+                                                       ) throws IOException {
+      confChanged(conf);
+      if (pathStr.startsWith("/")) {
+        pathStr = pathStr.substring(1);
+      }
+      return new PathIterator(localFS, pathStr, localDirs);
+    }
+
     /** We search through all the configured dirs for the file's existence
      *  and return true when we find one 
      */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java Fri Mar  4 04:43:33 2011
@@ -33,8 +33,20 @@ import org.apache.hadoop.util.Shell.Shel
 public class ProcessTree {
 
   private static final Log LOG = LogFactory.getLog(ProcessTree.class);
-
-  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+  
+  /**
+   * The constants for the signals.
+   */
+  public static enum Signal {
+    QUIT(3), KILL(9), TERM(15);
+    private int value;
+    private Signal(int value) {
+      this.value = value;
+    }
+    public int getValue() {
+      return value;
+    }
+  }
 
   public static final boolean isSetsidAvailable = isSetsidSupported();
   private static boolean isSetsidSupported() {
@@ -49,95 +61,8 @@ public class ProcessTree {
       setsidSupported = false;
     } finally { // handle the exit code
       LOG.info("setsid exited with exit code " + shexec.getExitCode());
-      return setsidSupported;
-    }
-  }
-
-  /**
-   * Destroy the process-tree.
-   * @param pid process id of the root process of the subtree of processes
-   *            to be killed
-   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
-   *                               after sending SIGTERM
-   * @param isProcessGroup pid is a process group leader or not
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public static void destroy(String pid, long sleeptimeBeforeSigkill,
-                             boolean isProcessGroup, boolean inBackground) {
-    if(isProcessGroup) {
-      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
-    }
-    else {
-      //TODO: Destroy all the processes in the subtree in this case also.
-      // For the time being, killing only the root process.
-      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
-    }
-  }
-
-  /** Destroy the process.
-   * @param pid Process id of to-be-killed-process
-   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
-   *                               after sending SIGTERM
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
-                                    boolean inBackground) {
-    terminateProcess(pid);
-    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
-  }
-
-  /** Destroy the process group.
-   * @param pgrpId Process group id of to-be-killed-processes
-   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
-   *                               after sending SIGTERM
-   * @param inBackground Process group is to be killed in the back ground with
-   *                     a separate thread
-   */
-  protected static void destroyProcessGroup(String pgrpId,
-                       long sleeptimeBeforeSigkill, boolean inBackground) {
-    terminateProcessGroup(pgrpId);
-    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
-  }
-
-  /**
-   * Sends terminate signal to the process, allowing it to gracefully exit.
-   * 
-   * @param pid pid of the process to be sent SIGTERM
-   */
-  public static void terminateProcess(String pid) {
-    ShellCommandExecutor shexec = null;
-    try {
-      String[] args = { "kill", pid };
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (IOException ioe) {
-      LOG.warn("Error executing shell command " + ioe);
-    } finally {
-      LOG.info("Killing process " + pid +
-               " with SIGTERM. Exit code " + shexec.getExitCode());
-    }
-  }
-
-  /**
-   * Sends terminate signal to all the process belonging to the passed process
-   * group, allowing the group to gracefully exit.
-   * 
-   * @param pgrpId process group id
-   */
-  public static void terminateProcessGroup(String pgrpId) {
-    ShellCommandExecutor shexec = null;
-    try {
-      String[] args = { "kill", "--", "-" + pgrpId };
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (IOException ioe) {
-      LOG.warn("Error executing shell command " + ioe);
-    } finally {
-      LOG.info("Killing all processes in the process group " + pgrpId +
-               " with SIGTERM. Exit code " + shexec.getExitCode());
     }
+    return setsidSupported;
   }
 
   /**
@@ -160,83 +85,63 @@ public class ProcessTree {
         LOG.warn("Thread sleep is interrupted.");
       }
       if(isProcessGroup) {
-        killProcessGroup(pid);
+        killProcessGroup(pid, Signal.KILL);
       } else {
-        killProcess(pid);
+        killProcess(pid, Signal.KILL);
       }
     }  
   }
   
-
-  /** Kills the process(OR process group) by sending the signal SIGKILL
-   * @param pid Process id(OR process group id) of to-be-deleted-process
-   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
-   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
-   *                               after sending SIGTERM
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  private static void sigKill(String pid, boolean isProcessGroup,
-                        long sleeptimeBeforeSigkill, boolean inBackground) {
-
-    if(inBackground) { // use a separate thread for killing
-      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
-                                                      sleeptimeBeforeSigkill);
-      sigKillThread.setDaemon(true);
-      sigKillThread.start();
-    }
-    else {
-      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
-    }
-  }
-
   /**
-   * Sends kill signal to process, forcefully terminating the process.
+   * Sends signal to process, forcefully terminating the process.
    * 
    * @param pid process id
+   * @param signal the signal number to send
    */
-  public static void killProcess(String pid) {
+  public static void killProcess(String pid, Signal signal) {
 
     //If process tree is not alive then return immediately.
     if(!ProcessTree.isAlive(pid)) {
       return;
     }
-    String[] args = { "kill", "-9", pid };
+    String[] args = { "kill", "-" + signal.getValue(), pid };
     ShellCommandExecutor shexec = new ShellCommandExecutor(args);
     try {
       shexec.execute();
     } catch (IOException e) {
-      LOG.warn("Error sending SIGKILL to process "+ pid + " ."+ 
+      LOG.warn("Error sending signal " + signal + " to process "+ pid + " ."+ 
           StringUtils.stringifyException(e));
     } finally {
-      LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
-          + shexec.getExitCode());
+      LOG.info("Killing process " + pid + " with signal " + signal + 
+               ". Exit code " + shexec.getExitCode());
     }
   }
 
   /**
-   * Sends kill signal to all process belonging to same process group,
+   * Sends signal to all process belonging to same process group,
    * forcefully terminating the process group.
    * 
    * @param pgrpId process group id
+   * @param signal the signal number to send
    */
-  public static void killProcessGroup(String pgrpId) {
+  public static void killProcessGroup(String pgrpId, Signal signal) {
 
     //If process tree is not alive then return immediately.
     if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
       return;
     }
 
-    String[] args = { "kill", "-9", "-"+pgrpId };
+    String[] args = { "kill", "-" + signal.getValue() , "-"+pgrpId };
     ShellCommandExecutor shexec = new ShellCommandExecutor(args);
     try {
       shexec.execute();
     } catch (IOException e) {
-      LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+ 
+      LOG.warn("Error sending signal " + signal + " to process group "+ 
+               pgrpId + " ."+ 
           StringUtils.stringifyException(e));
     } finally {
-      LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
-          + shexec.getExitCode());
+      LOG.info("Killing process group" + pgrpId + " with signal " + signal + 
+               ". Exit code " + shexec.getExitCode());
     }
   }
   
@@ -297,7 +202,7 @@ public class ProcessTree {
     private String pid = null;
     private boolean isProcessGroup = false;
 
-    private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+    private final long sleepTimeBeforeSigKill;
 
     private SigKillThread(String pid, boolean isProcessGroup, long interval) {
       this.pid = pid;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Fri Mar  4 04:43:33 2011
@@ -29,15 +29,11 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.Arrays;
 import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.util.Shell.ExitCodeException;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
  */
@@ -47,8 +43,6 @@ public class ProcfsBasedProcessTree exte
       .getLog(ProcfsBasedProcessTree.class);
 
   private static final String PROCFS = "/proc/";
-  public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
-  private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
   private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
       .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
 
@@ -61,21 +55,14 @@ public class ProcfsBasedProcessTree exte
   
   private Integer pid = -1;
 
-  private boolean setsidUsed = false;
-  
-  private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
-  
   private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
-    this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+    this(pid, false);
   }
   
-  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
-      long sigkillInterval) {
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed) {
     this(pid,PROCFS);
-    this.setsidUsed = setsidUsed;
-    sleeptimeBeforeSigkill = sigkillInterval; 
   }
 
   public ProcfsBasedProcessTree(String pid, String procfsDir) {
@@ -84,17 +71,6 @@ public class ProcfsBasedProcessTree exte
   }
   
   /**
-   * Sets SIGKILL interval
-   * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
-   *                  String, boolean, long)} instead
-   * @param interval The time to wait before sending SIGKILL
-   *                 after sending SIGTERM
-   */
-  public void setSigKillInterval(long interval) {
-    sleepTimeBeforeSigKill = interval;
-  }
-
-  /**
    * Checks if the ProcfsBasedProcessTree is available on this system.
    * 
    * @return true if ProcfsBasedProcessTree is available. False otherwise.
@@ -218,81 +194,6 @@ public class ProcfsBasedProcessTree exte
     return false;
   }
 
-  /** Verify that the given process id is same as its process group id.
-   * @param pidStr Process id of the to-be-verified-process
-   */
-  private static boolean assertPidPgrpidForMatch(String pidStr) {
-    Integer pId = Integer.parseInt(pidStr);
-    // Get information for this process
-    ProcessInfo pInfo = new ProcessInfo(pId);
-    pInfo = constructProcessInfo(pInfo);
-    //make sure that pId and its pgrpId match
-    if (!pInfo.getPgrpId().equals(pId)) {
-      LOG.warn("Unexpected: Process with PID " + pId +
-               " is not a process group leader.");
-      return false;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(pId + " is a process group leader, as expected.");
-    }
-    return true;
-  }
-
-  /** Make sure that the given pid is a process group leader and then
-   * destroy the process group.
-   * @param pgrpId   Process group id of to-be-killed-processes
-   * @param interval The time to wait before sending SIGKILL
-   *                 after sending SIGTERM
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
-                       boolean inBackground)
-         throws IOException {
-    // Make sure that the pid given is a process group leader
-    if (!assertPidPgrpidForMatch(pgrpId)) {
-      throw new IOException("Process with PID " + pgrpId  +
-                          " is not a process group leader.");
-    }
-    destroyProcessGroup(pgrpId, interval, inBackground);
-  }
-
-  /**
-   * Destroy the process-tree.
-   */
-  public void destroy() {
-    destroy(true);
-  }
-  
-  /**
-   * Destroy the process-tree.
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public void destroy(boolean inBackground) {
-    LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
-    if (pid == -1) {
-      return;
-    }
-    if (isAlive(pid.toString())) {
-      if (isSetsidAvailable && setsidUsed) {
-        // In this case, we know that pid got created using setsid. So kill the
-        // whole processGroup.
-        try {
-          assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
-                              inBackground);
-        } catch (IOException e) {
-          LOG.warn(StringUtils.stringifyException(e));
-        }
-      }
-      else {
-        //TODO: Destroy all the processes in the subtree in this case also.
-        // For the time being, killing only the root process.
-        destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
-      }
-    }
-  }
-
   private static final String PROCESSTREE_DUMP_FORMAT =
       "\t|- %d %d %d %d %s %d %s\n";
 
@@ -383,15 +284,6 @@ public class ProcfsBasedProcessTree exte
   }
 
   /**
-   * 
-   * Construct the ProcessInfo using the process' PID and procfs and return the
-   * same. Returns null on failing to read from procfs,
-   */
-  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
-    return constructProcessInfo(pinfo, PROCFS);
-  }
-
-  /**
    * Construct the ProcessInfo using the process' PID and procfs rooted at the
    * specified directory and return the same. It is provided mainly to assist
    * testing purposes.
@@ -449,58 +341,6 @@ public class ProcfsBasedProcessTree exte
   }
   
   /**
-   * Is the process with PID pid still alive?
-   */
-  private boolean isAlive(Integer pid) {
-    // This method assumes that isAlive is called on a pid that was alive not
-    // too long ago, and hence assumes no chance of pid-wrapping-around.
-    ShellCommandExecutor shexec = null;
-    try {
-      String[] args = { "kill", "-0", pid.toString() };
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (ExitCodeException ee) {
-      return false;
-    } catch (IOException ioe) {
-      LOG.warn("Error executing shell command "
-          + Arrays.toString(shexec.getExecString()) + ioe);
-      return false;
-    }
-    return (shexec.getExitCode() == 0 ? true : false);
-  }
-
-  /**
-   * Helper thread class that kills process-tree with SIGKILL in background
-   */
-  private class SigKillThread extends Thread {
-
-    public void run() {
-      this.setName(this.getClass().getName() + "-" + String.valueOf(pid));
-      ShellCommandExecutor shexec = null;
-
-      try {
-        // Sleep for some time before sending SIGKILL
-        Thread.sleep(sleepTimeBeforeSigKill);
-      } catch (InterruptedException i) {
-        LOG.warn("Thread sleep is interrupted.");
-      }
-
-      // Kill the root process with SIGKILL if it is still alive
-      if (ProcfsBasedProcessTree.this.isAlive(pid)) {
-        try {
-          String[] args = { "kill", "-9", pid.toString() };
-          shexec = new ShellCommandExecutor(args);
-          shexec.execute();
-        } catch (IOException ioe) {
-          LOG.warn("Error executing shell command " + ioe);
-        } finally {
-          LOG.info("Killing " + pid + " with SIGKILL. Exit code "
-              + shexec.getExitCode());
-        }
-      }
-    }
-  }
-  /**
    * Returns a string printing PIDs of process present in the
    * ProcfsBasedProcessTree. Output format : [pid pid ..]
    */
@@ -563,13 +403,6 @@ public class ProcfsBasedProcessTree exte
       return age;
     }
     
-    public boolean isParent(ProcessInfo p) {
-      if (pid.equals(p.getPpid())) {
-        return true;
-      }
-      return false;
-    }
-
     public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
         Integer sessionId, Long vmem) {
       this.name = name;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar  4 04:43:33 2011
@@ -21,13 +21,9 @@ package org.apache.hadoop.filecache;
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 
 import java.net.URI;
 
@@ -176,179 +172,8 @@ public class DistributedCache {
   public static final String CACHE_SYMLINK = "mapred.create.symlink";
   
   /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param fileStatus The file status on the dfs.
-   * @param isArchive if the cache is an archive or a file. In case it is an
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred is
-   *  returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  public static Path getLocalCache(URI cache, Configuration conf, 
-                                   Path baseDir, FileStatus fileStatus,
-                                   boolean isArchive, long confFileStamp,
-                                   Path currentWorkDir) 
-      throws IOException {
-    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
-        confFileStamp, currentWorkDir, true);
-  }
-
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param fileStatus The file status on the dfs.
-   * @param isArchive if the cache is an archive or a file. In case it is an
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred is
-   *  returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @param honorSymLinkConf if this is false, then the symlinks are not
-   * created even if conf says so (this is required for an optimization in task
-   * launches
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  public static Path getLocalCache(URI cache, Configuration conf, 
-      Path baseDir, FileStatus fileStatus,
-      boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-
-    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
-            confFileStamp, currentWorkDir, honorSymLinkConf, false);
-  }
-
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param isArchive if the cache is an archive or a file. In case it is an 
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will 
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred 
-   *  is returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  public static Path getLocalCache(URI cache, Configuration conf, 
-                                   Path baseDir, boolean isArchive,
-                                   long confFileStamp, Path currentWorkDir) 
-      throws IOException {
-    return getLocalCache(cache, conf, 
-                         baseDir, null, isArchive,
-                         confFileStamp, currentWorkDir);
-  }
-
-  /**
-   * This is the opposite of getlocalcache. When you are done with
-   * using the cache, you need to release the cache
-   * @param cache The cache URI to be released
-   * @param conf configuration which contains the filesystem the cache 
-   * is contained in.
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  public static void releaseCache(URI cache, Configuration conf)
-      throws IOException {
-	// find the timestamp of the uri
-    URI[] archives = DistributedCache.getCacheArchives(conf);
-    URI[] files = DistributedCache.getCacheFiles(conf);
-    String[] archivesTimestamps =
-          DistributedCache.getArchiveTimestamps(conf);
-    String[] filesTimestamps =
-          DistributedCache.getFileTimestamps(conf);
-    String timestamp = null;
-    if (archives != null) {
-      for (int i = 0; i < archives.length; i++) {
-        if (archives[i].equals(cache)) {
-          timestamp = archivesTimestamps[i];
-          break;
-        }
-      }
-    }
-    if (timestamp == null && files != null) {
-      for (int i = 0; i < files.length; i++) {
-        if (files[i].equals(cache)) {
-          timestamp = filesTimestamps[i];
-          break;
-        }
-      }
-    }
-    if (timestamp == null) {
-      throw new IOException("TimeStamp of the uri couldnot be found");
-    }
-    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .releaseCache(cache, conf, Long.parseLong(timestamp), 
-            TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
-  }
-  
-  /**
-   * Returns the relative path of the dir this cache will be localized in
-   * relative path that this cache will be localized in. For
-   * hdfs://hostname:port/absolute_path -- the relative path is
-   * hostname/absolute path -- if it is just /absolute_path -- then the
-   * relative path is hostname of DFS this mapred cluster is running
-   * on/absolute_path
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  @Deprecated
-  public static String makeRelative(URI cache, Configuration conf)
-      throws IOException {
-    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .makeRelative(cache, conf);
-  }
-
-  /**
-   * Returns {@link FileStatus} of a given cache file on hdfs.
+   * Returns {@link FileStatus} of a given cache file on hdfs. Internal to 
+   * MapReduce.
    * @param conf configuration
    * @param cache cache file 
    * @return <code>FileStatus</code> of a given cache file on hdfs
@@ -357,13 +182,11 @@ public class DistributedCache {
   public static FileStatus getFileStatus(Configuration conf, URI cache)
     throws IOException {
     FileSystem fileSystem = FileSystem.get(cache, conf);
-    Path filePath = new Path(cache.getPath());
-
-    return fileSystem.getFileStatus(filePath);
+    return fileSystem.getFileStatus(new Path(cache.getPath()));
   }
   
   /**
-   * Returns mtime of a given cache file on hdfs.
+   * Returns mtime of a given cache file on hdfs. Internal to MapReduce.
    * @param conf configuration
    * @param cache cache file 
    * @return mtime of a given cache file on hdfs
@@ -388,17 +211,6 @@ public class DistributedCache {
     TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
   }
   
-  private static String getFileSysName(URI url) {
-    String fsname = url.getScheme();
-    if ("hdfs".equals(fsname)) {
-      String host = url.getHost();
-      int port = url.getPort();
-      return (port == (-1)) ? host : (host + ":" + port);
-    } else {
-      return null;
-    }
-  }
-
   /**
    * Set the configuration with the given set of archives. Intended
    * to be used by user code.
@@ -421,11 +233,22 @@ public class DistributedCache {
     conf.set(CACHE_FILES, sfiles);
   }
 
+  private static Path[] parsePaths(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    Path[] result = new Path[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = new Path(strs[i]);
+    }
+    return result;
+  }
+
   /**
    * Get cache archives set in the Configuration.  Used by
    * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
-   * @return A URI array of the caches set in the Configuration
+   * @return An array of the caches set in the Configuration
    * @throws IOException
    */
   public static URI[] getCacheArchives(Configuration conf) throws IOException {
@@ -436,7 +259,7 @@ public class DistributedCache {
    * Get cache files set in the Configuration.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
-   * @return A URI array of the files set in the Configuration
+   * @return Am array of the files set in the Configuration
    * @throws IOException
    */
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
@@ -469,26 +292,41 @@ public class DistributedCache {
   }
 
   /**
+   * Parse a list of strings into longs.
+   * @param strs the list of strings to parse
+   * @return a list of longs that were parsed. same length as strs.
+   */
+  private static long[] parseTimestamps(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+
+  /**
    * Get the timestamps of the archives.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of timestamps 
+   * @return a long array of timestamps 
    * @throws IOException
    */
-  public static String[] getArchiveTimestamps(Configuration conf) {
-    return conf.getStrings(CACHE_ARCHIVES_TIMESTAMPS);
+  public static long[] getArchiveTimestamps(Configuration conf) {
+    return parseTimestamps(conf.getStrings(CACHE_ARCHIVES_TIMESTAMPS));
   }
 
-
   /**
    * Get the timestamps of the files.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of timestamps 
+   * @return a long array of timestamps 
    * @throws IOException
    */
-  public static String[] getFileTimestamps(Configuration conf) {
-    return conf.getStrings(CACHE_FILES_TIMESTAMPS);
+  public static long[] getFileTimestamps(Configuration conf) {
+    return parseTimestamps(conf.getStrings(CACHE_FILES_TIMESTAMPS));
   }
 
   /**
@@ -532,6 +370,30 @@ public class DistributedCache {
   public static void setLocalFiles(Configuration conf, String str) {
     conf.set(CACHE_LOCALFILES, str);
   }
+  
+  /**
+   * Add a archive that has been localized to the conf.  Used
+   * by internal DistributedCache code.
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local archives
+   */
+  public static void addLocalArchives(Configuration conf, String str) {
+    String archives = conf.get(CACHE_LOCALARCHIVES);
+    conf.set(CACHE_LOCALARCHIVES, archives == null ? str
+        : archives + "," + str);
+  }
+
+  /**
+   * Add a file that has been localized to the conf..  Used
+   * by internal DistributedCache code.
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local files
+   */
+  public static void addLocalFiles(Configuration conf, String str) {
+    String files = conf.get(CACHE_LOCALFILES);
+    conf.set(CACHE_LOCALFILES, files == null ? str
+        : files + "," + str);
+  }
 
   /**
    * Add a archives to be localized to the conf.  Intended to
@@ -606,7 +468,7 @@ public class DistributedCache {
     String classpath = conf.get("mapred.job.classpath.files");
     if (classpath == null)
       return null;
-    ArrayList list = Collections.list(new StringTokenizer(classpath, System
+    ArrayList<Object> list = Collections.list(new StringTokenizer(classpath, System
                                                           .getProperty("path.separator")));
     Path[] paths = new Path[list.size()];
     for (int i = 0; i < list.size(); i++) {
@@ -665,7 +527,7 @@ public class DistributedCache {
     String classpath = conf.get("mapred.job.classpath.archives");
     if (classpath == null)
       return null;
-    ArrayList list = Collections.list(new StringTokenizer(classpath, System
+    ArrayList<Object> list = Collections.list(new StringTokenizer(classpath, System
                                                           .getProperty("path.separator")));
     Path[] paths = new Path[list.size()];
     for (int i = 0; i < list.size(); i++) {
@@ -747,15 +609,4 @@ public class DistributedCache {
     return true;
   }
 
-  /**
-   * Clear the entire contents of the cache and delete the backing files. This
-   * should only be used when the server is reinitializing, because the users
-   * are going to lose their files.
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  public static void purgeCache(Configuration conf) throws IOException {
-    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .purgeCache();
-  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Fri Mar  4 04:43:33 2011
@@ -30,19 +30,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobLocalizer;
 
 /**
  * Helper class of {@link TrackerDistributedCacheManager} that represents
- * the cached files of a single task.  This class is used
- * by TaskRunner/LocalJobRunner to parse out the job configuration
- * and setup the local caches.
+ * the cached files of a single job.
  * 
  * <b>This class is internal to Hadoop, and should not be treated as a public
  * interface.</b>
@@ -52,7 +48,7 @@ public class TaskDistributedCacheManager
   private final Configuration taskConf;
   private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
   private final List<String> classPaths = new ArrayList<String>();
- 
+  
   private boolean setupCalled = false;
 
   /**
@@ -94,7 +90,7 @@ public class TaskDistributedCacheManager
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, String cacheVisibilities[], Path[] paths, 
+        long[] timestamps, boolean cacheVisibilities[], Path[] paths, 
         FileType type) throws IOException {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
@@ -110,9 +106,8 @@ public class TaskDistributedCacheManager
         for (int i = 0; i < uris.length; ++i) {
           URI u = uris[i];
           boolean isClassPath = (null != classPaths.get(u.getPath()));
-          long t = Long.parseLong(timestamps[i]);
-          ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
-              t, isClassPath));
+          ret.add(new CacheFile(u, type, cacheVisibilities[i],
+              timestamps[i], isClassPath));
         }
       }
       return ret;
@@ -148,36 +143,37 @@ public class TaskDistributedCacheManager
   }
 
   /**
-   * Retrieve files into the local cache and updates the task configuration 
-   * (which has been passed in via the constructor).
+   * Retrieve public distributed cache files into the local cache and updates
+   * the task configuration (which has been passed in via the constructor).
+   * The private distributed cache is just looked at and the paths where the
+   * files/archives should go to is decided here. The actual localization is
+   * done by {@link JobLocalizer}.
    * 
    * It is the caller's responsibility to re-write the task configuration XML
    * file, if necessary.
    */
-  public void setup(LocalDirAllocator lDirAlloc, File workDir, 
-      String privateCacheSubdir, String publicCacheSubDir) throws IOException {
+  public void setupCache(String publicCacheSubdir, String privateCacheSubdir) 
+  throws IOException {
     setupCalled = true;
-    
-    if (cacheFiles.isEmpty()) {
-      return;
-    }
-
     ArrayList<Path> localArchives = new ArrayList<Path>();
     ArrayList<Path> localFiles = new ArrayList<Path>();
-    Path workdirPath = new Path(workDir.getAbsolutePath());
 
     for (CacheFile cacheFile : cacheFiles) {
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-      String cacheSubdir = publicCacheSubDir;
-      if (!cacheFile.isPublic) {
-        cacheSubdir = privateCacheSubdir;
-      }
-      Path p = distributedCacheManager.getLocalCache(uri, taskConf,
-          cacheSubdir, fileStatus, 
-          cacheFile.type == CacheFile.FileType.ARCHIVE,
-          cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
+      Path p;
+      if (cacheFile.isPublic) {
+        p = distributedCacheManager.getLocalCache(uri, taskConf,
+            publicCacheSubdir, fileStatus, 
+            cacheFile.type == CacheFile.FileType.ARCHIVE,
+            cacheFile.timestamp, cacheFile.isPublic);
+      } else {
+        p = distributedCacheManager.getLocalCache(uri, taskConf,
+            privateCacheSubdir, fileStatus, 
+            cacheFile.type == CacheFile.FileType.ARCHIVE,
+            cacheFile.timestamp, cacheFile.isPublic);
+      }
       cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
@@ -192,11 +188,11 @@ public class TaskDistributedCacheManager
 
     // Update the configuration object with localized data.
     if (!localArchives.isEmpty()) {
-      DistributedCache.setLocalArchives(taskConf, 
+      DistributedCache.addLocalArchives(taskConf, 
         stringifyPathList(localArchives));
     }
     if (!localFiles.isEmpty()) {
-      DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+      DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
     }
 
   }
@@ -232,6 +228,28 @@ public class TaskDistributedCacheManager
     }
     return classPaths;
   }
+  
+  private List<String> formClasspath(Path[] paths, URI[] uris, 
+                                     Path[] localizedFiles) {
+    if (uris == null) {
+      return new ArrayList<String>();
+    }
+    Map<String, Path> clMap = new HashMap<String, Path>();
+    List<String> classPaths = new ArrayList<String>();
+    if (paths != null) {
+      for (Path p : paths) {
+        clMap.put(p.toUri().getPath().toString(), p);
+      }
+    }
+    for (int i = 0; i < uris.length; ++i) {
+      URI u = uris[i];
+      boolean isClassPath = (null != clMap.get(u.getPath()));
+      if (isClassPath) {
+        classPaths.add(localizedFiles[i].toString());
+      }
+    }
+    return classPaths;
+  }
 
   /**
    * Releases the cached files/archives, so that space
@@ -246,6 +264,16 @@ public class TaskDistributedCacheManager
     }
   }
 
+  public void setSizes(long[] sizes) throws IOException {
+    int i = 0;
+    for (CacheFile c: cacheFiles) {
+      if (!c.isPublic) {
+        distributedCacheManager.setSize(c.uri, taskConf, c.timestamp, c.owner, 
+                                        sizes[i++]);
+      }
+    }
+  }
+
   /**
    * Creates a class loader that includes the designated
    * files and archives.



Mime
View raw message