hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077322 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ c++/task-controller/tests/ mapred/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ mapred/org/apache/hadoop/mapreduce/se...
Date Fri, 04 Mar 2011 04:03:24 GMT
Author: omalley
Date: Fri Mar  4 04:03:23 2011
New Revision: 1077322

URL: http://svn.apache.org/viewvc?rev=1077322&view=rev
Log:
commit 3d86e34d8f754bbda13cbaa6d18fe223d708a20a
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date:   Wed Mar 17 11:37:43 2010 +0530

    MAPREDUCE-927 from https://issues.apache.org/jira/secure/attachment/12439009/patch-927-5-dist.txt

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 04:03:23 2011
@@ -212,9 +212,17 @@ char *get_task_dir_path(const char *tt_r
 /**
  * Get the log directory for the given attempt.
  */
-char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
-  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
-      attempt_id);
+char *get_task_log_dir(const char *log_dir, const char *job_id, 
+    const char *attempt_id) {
+  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 3, log_dir,
+      job_id, attempt_id);
+}
+
+/**
+ * Get the log directory for the given job.
+ */
+char *get_job_log_dir(const char *log_dir, const char *job_id) {
+  return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id);
 }
 
 /**
@@ -379,7 +387,8 @@ static int secure_path(const char *path,
       continue;
     }
 
-    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
+    if (should_check_ownership && 
+        (check_ownership(entry->fts_path, uid, gid) != 0)) {
       fprintf(LOGFILE,
           "Invalid file path. %s not user/group owned by the tasktracker.\n",
           entry->fts_path);
@@ -498,15 +507,61 @@ int prepare_attempt_directories(const ch
 }
 
 /**
+ * Function to prepare the job log dir for the child. It gives the user
+ * ownership of the job's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ *     *  sudo chown user:mapred log-dir/userlogs/$jobid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
+ *     *  sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ */
+int prepare_job_logs(const char *log_dir, const char *job_id,
+    mode_t permissions) {
+
+  char *job_log_dir = get_job_log_dir(log_dir, job_id);
+  if (job_log_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get job log directory %s.\n", job_log_dir);
+    return -1;
+  }
+
+  struct stat filestat;
+  if (stat(job_log_dir, &filestat) != 0) {
+    if (errno == ENOENT) {
+#ifdef DEBUG
+      fprintf(LOGFILE, "job_log_dir %s doesn't exist. Not doing anything.\n",
+          job_log_dir);
+#endif
+      free(job_log_dir);
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the job log dir %s\n", job_log_dir);
+      free(job_log_dir);
+      return -1;
+    }
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+  if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+      permissions, S_ISGID | permissions, 1) != 0) {
+    fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
+    free(job_log_dir);
+    return -1;
+  }
+  free(job_log_dir);
+  return 0;
+}
+
+/**
  * Function to prepare the task logs for the child. It gives the user
  * ownership of the attempt's log-dir to the user and group ownership to the
  * user running tasktracker.
- *     *  sudo chown user:mapred log-dir/userlogs/$attemptid
- *     *  sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ *     *  sudo chown user:mapred log-dir/userlogs/$jobid/$attemptid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid/$attemptid
  */
-int prepare_task_logs(const char *log_dir, const char *task_id) {
+int prepare_task_logs(const char *log_dir, const char *job_id, 
+    const char *task_id) {
 
-  char *task_log_dir = get_task_log_dir(log_dir, task_id);
+  char *task_log_dir = get_task_log_dir(log_dir, job_id, task_id);
   if (task_log_dir == NULL) {
     fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
     return -1;
@@ -524,10 +579,12 @@ int prepare_task_logs(const char *log_di
       fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
           task_log_dir);
 #endif
+      free(task_log_dir);
       return 0;
     } else {
       // stat failed because of something else!
       fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+      free(task_log_dir);
       return -1;
     }
   }
@@ -537,8 +594,10 @@ int prepare_task_logs(const char *log_di
       S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
     // setgid on dirs but not files, 770. As of now, there are no files though
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+    free(task_log_dir);
     return -1;
   }
+  free(task_log_dir);
   return 0;
 }
 
@@ -557,8 +616,9 @@ int get_user_details(const char *user) {
 
 /*
  * Function to check if the TaskTracker actually owns the file.
+ * Or it has right ownership already. 
   */
-int check_ownership(char *path) {
+int check_ownership(char *path, uid_t uid, gid_t gid) {
   struct stat filestat;
   if (stat(path, &filestat) != 0) {
     return UNABLE_TO_STAT_FILE;
@@ -566,8 +626,10 @@ int check_ownership(char *path) {
   // check user/group. User should be TaskTracker user, group can either be
   // TaskTracker's primary group or the special group to which binary's
   // permissions are set.
-  if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
-      != filestat.st_gid)) {
+  // Or it can be the user/group owned by uid and gid passed. 
+  if ((getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+      != filestat.st_gid)) &&
+      ((uid != filestat.st_uid) || (gid != filestat.st_gid))) {
     return FILE_NOT_OWNED_BY_TASKTRACKER;
   }
   return 0;
@@ -668,10 +730,13 @@ int initialize_user(const char *user) {
  * Function to prepare the job directories for the task JVM.
  * We do the following:
  *     *  sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid
+ *     *  sudo chown user:mapred -R logs/userlogs/$jobid 
  *     *  if user is not $tt_user,
  *     *    sudo chmod 2570 -R taskTracker/$user/jobcache/$jobid
+ *     *    sudo chmod 2570 -R logs/userlogs/$jobid
  *     *  else // user is tt_user
  *     *    sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid
+ *     *    sudo chmod 2770 -R logs/userlogs/$jobid
  *     *
  *     *  For any user, sudo chmod 2770 taskTracker/$user/jobcache/$jobid/work
  */
@@ -783,11 +848,32 @@ int initialize_job(const char *jobid, co
   }
   free(local_dir);
   free(full_local_dir_str);
-  cleanup();
+  int exit_code = 0;
   if (failed) {
-    return INITIALIZE_JOB_FAILED;
+    exit_code = INITIALIZE_JOB_FAILED;
+    goto cleanup;
   }
-  return 0;
+
+  char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+  if (log_dir == NULL) {
+    fprintf(LOGFILE, "Log directory is not configured.\n");
+    exit_code = INVALID_TT_LOG_DIR;
+    goto cleanup;
+  }
+
+  if (prepare_job_logs(log_dir, jobid, permissions) != 0) {
+    fprintf(LOGFILE, "Couldn't prepare job logs directory %s for %s.\n",
+        log_dir, jobid);
+    exit_code = PREPARE_JOB_LOGS_FAILED;
+  }
+
+  cleanup:
+  // free configurations
+  cleanup();
+  if (log_dir != NULL) {
+    free(log_dir);
+  }
+  return exit_code;
 }
 
 /**
@@ -891,7 +977,7 @@ int initialize_task(const char *jobid, c
     goto cleanup;
   }
 
-  if (prepare_task_logs(log_dir, taskid) != 0) {
+  if (prepare_task_logs(log_dir, jobid, taskid) != 0) {
     fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
         log_dir, taskid);
     exit_code = PREPARE_TASK_LOGS_FAILED;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar  4 04:03:23 2011
@@ -70,7 +70,8 @@ enum errorcodes {
   INITIALIZE_DISTCACHEFILE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
   UNABLE_TO_BUILD_PATH, //21
-  INVALID_TASKCONTROLLER_PERMISSIONS //22
+  INVALID_TASKCONTROLLER_PERMISSIONS, //22
+  PREPARE_JOB_LOGS_FAILED, //23
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -83,7 +84,9 @@ enum errorcodes {
 
 #define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
 
-#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c Fri Mar  4 04:03:23 2011
@@ -171,13 +171,26 @@ void test_get_task_launcher_file() {
   assert(ret == 0);
 }
 
+void test_get_job_log_dir() {
+  char *logdir = (char *) get_job_log_dir("/tmp/testing",
+    "job_200906101234_0001");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
 void test_get_task_log_dir() {
   char *logdir = (char *) get_task_log_dir("/tmp/testing",
-      "attempt_200906112028_0001_m_000000_0");
+    "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
   printf("logdir obtained is %s\n", logdir);
   int ret = 0;
   if (strcmp(logdir,
-      "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+      "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      != 0) {
     ret = -1;
   }
   free(logdir);
@@ -203,6 +216,9 @@ int main(int argc, char **argv) {
   printf("\nTesting get_task_launcher_file()\n");
   test_get_task_launcher_file();
 
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
   printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 04:03:23 2011
@@ -631,7 +631,7 @@
   <name>mapred.userlog.retain.hours</name>
   <value>24</value>
   <description>The maximum time, in hours, for which the user-logs are to be 
-          retained.
+               retained after the job completion.
   </description>
 </property>
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 04:03:23 2011
@@ -193,7 +193,6 @@ class Child {
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);
-        TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
 
         task.setConf(job);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 04:03:23 2011
@@ -32,6 +32,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -419,9 +421,7 @@ class JvmManager {
       }
       public void run() {
         runChild(env);
-
-        // Post-JVM-exit logs processing. Truncate the logs.
-        truncateJVMLogs();
+        jvmFinished();
       }
 
       public void runChild(JvmEnv env) {
@@ -481,11 +481,12 @@ class JvmManager {
         }
       }
 
-      // Post-JVM-exit logs processing. Truncate the logs.
-      private void truncateJVMLogs() {
+      // Post-JVM-exit logs processing. inform user log manager
+      private void jvmFinished() {
         Task firstTask = initalContext.task;
-        tracker.getTaskLogsMonitor().addProcessForLogTruncation(
-            firstTask.getTaskID(), tasksGiven);
+        JvmFinishedEvent jfe = new JvmFinishedEvent(new JVMInfo(firstTask
+            .getTaskID(), tasksGiven));
+        tracker.getUserLogManager().addLogEvent(jfe);
       }
 
       public void taskRan() {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 04:03:23 2011
@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -42,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.log4j.Appender;
@@ -71,7 +71,7 @@ public class TaskLog {
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(getBaseDir(taskid.toString()), filter.toString());
+    return new File(getAttemptDir(taskid.toString()), filter.toString());
   }
 
   /**
@@ -94,7 +94,7 @@ public class TaskLog {
           + ie);
       return null;
     }
-    return new File(getBaseDir(l.location), filter.toString());
+    return new File(getAttemptDir(l.location), filter.toString());
   }
 
   /**
@@ -108,7 +108,7 @@ public class TaskLog {
    */
   static String getRealTaskLogFilePath(String location, LogName filter)
       throws IOException {
-    return FileUtil.makeShellPath(new File(getBaseDir(location),
+    return FileUtil.makeShellPath(new File(getAttemptDir(location),
         filter.toString()));
   }
 
@@ -144,7 +144,7 @@ public class TaskLog {
     for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
       LogFileDetail l = new LogFileDetail();
       l.location = loc;
-      l.length = new File(getBaseDir(l.location), filter.toString()).length();
+      l.length = new File(getAttemptDir(l.location), filter.toString()).length();
       l.start = 0;
       allLogsFileDetails.put(filter, l);
     }
@@ -166,7 +166,7 @@ public class TaskLog {
   }
   
   private static File getTmpIndexFile(String taskid) {
-    return new File(getBaseDir(taskid), "log.tmp");
+    return new File(getAttemptDir(taskid), "log.tmp");
   }
   public static File getIndexFile(String taskid) {
     return getIndexFile(taskid, false);
@@ -174,9 +174,9 @@ public class TaskLog {
   
   public static File getIndexFile(String taskid, boolean isCleanup) {
     if (isCleanup) {
-      return new File(getBaseDir(taskid), "log.index.cleanup");
+      return new File(getAttemptDir(taskid), "log.index.cleanup");
     } else {
-      return new File(getBaseDir(taskid), "log.index");
+      return new File(getAttemptDir(taskid), "log.index");
     }
   }
 
@@ -184,8 +184,9 @@ public class TaskLog {
     return System.getProperty("hadoop.log.dir");
   }
 
-  static File getBaseDir(String taskid) {
-    return new File(LOG_DIR, taskid);
+  static File getAttemptDir(String taskid) {
+    return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
+        taskid);
   }
 
   static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
@@ -317,39 +318,6 @@ public class TaskLog {
     }
   }
 
-  private static class TaskLogsPurgeFilter implements FileFilter {
-    long purgeTimeStamp;
-  
-    TaskLogsPurgeFilter(long purgeTimeStamp) {
-      this.purgeTimeStamp = purgeTimeStamp;
-    }
-
-    public boolean accept(File file) {
-      LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
-      return file.lastModified() < purgeTimeStamp;
-    }
-  }
-
-  /**
-   * Purge old user logs.
-   * 
-   * @throws IOException
-   */
-  public static synchronized void cleanup(int logsRetainHours
-                                          ) throws IOException {
-    // Purge logs of tasks on this tasktracker if their  
-    // mtime has exceeded "mapred.task.log.retain" hours
-    long purgeTimeStamp = System.currentTimeMillis() - 
-                            (logsRetainHours*60L*60*1000);
-    File[] oldTaskLogs = LOG_DIR.listFiles
-                           (new TaskLogsPurgeFilter(purgeTimeStamp));
-    if (oldTaskLogs != null) {
-      for (int i=0; i < oldTaskLogs.length; ++i) {
-        FileUtil.fullyDelete(oldTaskLogs[i]);
-      }
-    }
-  }
-
   static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
@@ -390,7 +358,7 @@ public class TaskLog {
       start += fileDetail.start;
       end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(new File(getBaseDir(fileDetail.location), 
+      file = new FileInputStream(new File(getAttemptDir(fileDetail.location), 
           kind.toString()));
       // skip upto start
       long pos = 0;
@@ -696,4 +664,14 @@ public class TaskLog {
     return LOG_DIR;
   }
   
+  /**
+   * Get the user log directory for the job jobid.
+   * 
+   * @param jobid
+   * @return user log directory for the job
+   */
+  public static File getJobDir(JobID jobid) {
+    return new File(getUserLogDir(), jobid.toString());
+  }
+
 } // TaskLog

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java Fri Mar  4 04:03:23 2011
@@ -141,7 +141,7 @@ public class TaskLogServlet extends Http
    */
   static Configuration getConfFromJobACLsFile(String attemptIdStr) {
     Configuration conf = new Configuration(false);
-    conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+    conf.addResource(new Path(TaskLog.getAttemptDir(attemptIdStr).toString(),
         TaskRunner.jobACLsFile));
     return conf;
   }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+
+/**
+ * The class for truncating the user logs. 
+ * Should be used only by {@link UserLogManager}. 
+ *
+ */
+public class TaskLogsTruncater {
+  static final Log LOG = LogFactory.getLog(TaskLogsTruncater.class);
+
+  static final String MAP_USERLOG_RETAIN_SIZE =
+    "mapreduce.cluster.map.userlog.retain-size";
+  static final String REDUCE_USERLOG_RETAIN_SIZE =
+    "mapreduce.cluster.reduce.userlog.retain-size";
+  static final int DEFAULT_RETAIN_SIZE = -1;
+  
+  long mapRetainSize, reduceRetainSize;
+
+  public TaskLogsTruncater(Configuration conf) {
+    mapRetainSize = conf.getLong(MAP_USERLOG_RETAIN_SIZE, DEFAULT_RETAIN_SIZE);
+    reduceRetainSize = conf.getLong(REDUCE_USERLOG_RETAIN_SIZE,
+        DEFAULT_RETAIN_SIZE);
+    LOG.info("Initializing logs' truncater with mapRetainSize=" + mapRetainSize
+        + " and reduceRetainSize=" + reduceRetainSize);
+
+  }
+
+  private static final int DEFAULT_BUFFER_SIZE = 4 * 1024;
+
+  static final int MINIMUM_RETAIN_SIZE_FOR_TRUNCATION = 0;
+
+  /**
+   * Process the removed task's logs. This involves truncating them to
+   * retainSize.
+   */
+  public void truncateLogs(JVMInfo lInfo) {
+    TaskAttemptID firstAttempt = TaskAttemptID.downgrade(lInfo
+        .getFirstAttemptID());
+
+    // Read the log-file details for all the attempts that ran in this JVM
+    Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
+    try {
+      taskLogFileDetails = getAllLogsFileDetails(lInfo.getAllAttempts());
+    } catch (IOException e) {
+      LOG.warn(
+          "Exception in truncateLogs while getting allLogsFileDetails()."
+              + " Ignoring the truncation of logs of this process.", e);
+      return;
+    }
+
+    // set this boolean to true if any of the log files is truncated
+    boolean indexModified = false;
+
+    Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails =
+        new HashMap<Task, Map<LogName, LogFileDetail>>();
+    // Make a copy of original indices into updated indices 
+    for (LogName logName : LogName.values()) {
+      copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+          updatedTaskLogFileDetails, logName);
+    }
+
+    File attemptLogDir = TaskLog.getAttemptDir(firstAttempt.toString());
+
+    FileWriter tmpFileWriter;
+    FileReader logFileReader;
+    // Now truncate file by file
+    logNameLoop: for (LogName logName : LogName.values()) {
+
+      File logFile = TaskLog.getTaskLogFile(firstAttempt, logName);
+
+      // //// Optimization: if no task is over limit, just skip truncation-code
+      if (logFile.exists()
+          && !isTruncationNeeded(lInfo, taskLogFileDetails, logName)) {
+        LOG.debug("Truncation is not needed for "
+            + logFile.getAbsolutePath());
+        continue;
+      }
+      // //// End of optimization
+
+      // Truncation is needed for this log-file. Go ahead now.
+      File tmpFile = new File(attemptLogDir, "truncate.tmp");
+      try {
+        tmpFileWriter = new FileWriter(tmpFile);
+      } catch (IOException ioe) {
+        LOG.warn("Cannot open " + tmpFile.getAbsolutePath()
+            + " for writing truncated log-file "
+            + logFile.getAbsolutePath()
+            + ". Continuing with other log files. ", ioe);
+        continue;
+      }
+
+      try {
+        logFileReader = new FileReader(logFile);
+      } catch (FileNotFoundException fe) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cannot open " + logFile.getAbsolutePath()
+              + " for reading. Continuing with other log files");
+        }
+        if (!tmpFile.delete()) {
+          LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+        }
+        continue;
+      }
+
+      long newCurrentOffset = 0;
+      // Process each attempt from the ordered list passed.
+      for (Task task : lInfo.getAllAttempts()) {
+
+        // Truncate the log files of this task-attempt so that only the last
+        // retainSize many bytes of this log file is retained and the log
+        // file is reduced in size saving disk space.
+        long retainSize =
+            (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+        LogFileDetail newLogFileDetail = null;
+        try {
+          newLogFileDetail =
+              truncateALogFileOfAnAttempt(task.getTaskID(),
+                  taskLogFileDetails.get(task).get(logName), retainSize,
+                  tmpFileWriter, logFileReader, logName);
+        } catch (IOException ioe) {
+          LOG.warn("Cannot truncate the log file "
+              + logFile.getAbsolutePath()
+              + ". Caught exception while handling " + task.getTaskID(),
+              ioe);
+          // revert back updatedTaskLogFileDetails
+          copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+              updatedTaskLogFileDetails, logName);
+          if (!tmpFile.delete()) {
+            LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+          }
+          continue logNameLoop;
+        }
+
+        // Track information for updating the index file properly.
+        // Index files don't track DEBUGOUT and PROFILE logs, so skip'em.
+        if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+          if (!updatedTaskLogFileDetails.containsKey(task)) {
+            updatedTaskLogFileDetails.put(task,
+                new HashMap<LogName, LogFileDetail>());
+          }
+          // newLogFileDetail already has the location and length set, just
+          // set the start offset now.
+          newLogFileDetail.start = newCurrentOffset;
+          updatedTaskLogFileDetails.get(task).put(logName, newLogFileDetail);
+          newCurrentOffset += newLogFileDetail.length;
+          indexModified = true; // set the flag
+        }
+      }
+
+      try {
+        tmpFileWriter.close();
+      } catch (IOException ioe) {
+        LOG.warn("Couldn't close the tmp file " + tmpFile.getAbsolutePath()
+            + ". Deleting it.", ioe);
+        copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+            updatedTaskLogFileDetails, logName);
+        if (!tmpFile.delete()) {
+          LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+        }
+        continue;
+      }
+
+      if (!tmpFile.renameTo(logFile)) {
+        // If the tmpFile cannot be renamed revert back
+        // updatedTaskLogFileDetails to maintain the consistency of the
+        // original log file
+        copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+            updatedTaskLogFileDetails, logName);
+        if (!tmpFile.delete()) {
+          LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+        }
+      }
+    }
+
+    if (indexModified) {
+      // Update the index files
+      updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+    }
+  }
+
+  /**
+   * @param lInfo
+   * @param taskLogFileDetails
+   * @param updatedTaskLogFileDetails
+   * @param logName
+   */
+  private void copyOriginalIndexFileInfo(JVMInfo lInfo,
+      Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+      Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
+      LogName logName) {
+    if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+      for (Task task : lInfo.getAllAttempts()) {
+        if (!updatedTaskLogFileDetails.containsKey(task)) {
+          updatedTaskLogFileDetails.put(task,
+              new HashMap<LogName, LogFileDetail>());
+        }
+        updatedTaskLogFileDetails.get(task).put(logName,
+            taskLogFileDetails.get(task).get(logName));
+      }
+    }
+  }
+
+  /**
+   * Get the logFileDetails of all the list of attempts passed.
+   * 
+   * @param lInfo
+   * @return a map of task to the log-file detail
+   * @throws IOException
+   */
+  private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails(
+      final List<Task> allAttempts) throws IOException {
+    Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails =
+        new HashMap<Task, Map<LogName, LogFileDetail>>();
+    for (Task task : allAttempts) {
+      Map<LogName, LogFileDetail> allLogsFileDetails;
+      allLogsFileDetails =
+          TaskLog.getAllLogsFileDetails(task.getTaskID(),
+              task.isTaskCleanupTask());
+      taskLogFileDetails.put(task, allLogsFileDetails);
+    }
+    return taskLogFileDetails;
+  }
+
+  /**
+   * Check if truncation of logs is needed for the given jvmInfo. If all the
+   * tasks that ran in a JVM are within the log-limits, then truncation is not
+   * needed. Otherwise it is needed.
+   * 
+   * @param lInfo
+   * @param taskLogFileDetails
+   * @param logName
+   * @return true if truncation is needed, false otherwise
+   */
+  private boolean isTruncationNeeded(JVMInfo lInfo,
+      Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+      LogName logName) {
+    boolean truncationNeeded = false;
+    LogFileDetail logFileDetail = null;
+    for (Task task : lInfo.getAllAttempts()) {
+      long taskRetainSize =
+          (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+      Map<LogName, LogFileDetail> allLogsFileDetails =
+          taskLogFileDetails.get(task);
+      logFileDetail = allLogsFileDetails.get(logName);
+      if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+          && logFileDetail.length > taskRetainSize) {
+        truncationNeeded = true;
+        break;
+      }
+    }
+    return truncationNeeded;
+  }
+
+  /**
+   * Truncate the log file of this task-attempt so that only the last retainSize
+   * many bytes of each log file is retained and the log file is reduced in size
+   * saving disk space.
+   * 
+   * @param taskID Task whose logs need to be truncated
+   * @param oldLogFileDetail contains the original log details for the attempt
+   * @param taskRetainSize retain-size
+   * @param tmpFileWriter New log file to write to. Already opened in append
+   *          mode.
+   * @param logFileReader Original log file to read from.
+   * @return
+   * @throws IOException
+   */
+  private LogFileDetail truncateALogFileOfAnAttempt(
+      final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail,
+      final long taskRetainSize, final FileWriter tmpFileWriter,
+      final FileReader logFileReader,
+      final LogName logName) throws IOException {
+    LogFileDetail newLogFileDetail = new LogFileDetail();
+
+    // ///////////// Truncate log file ///////////////////////
+
+    // New location of log file is same as the old
+    newLogFileDetail.location = oldLogFileDetail.location;
+    if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+        && oldLogFileDetail.length > taskRetainSize) {
+      LOG.info("Truncating " + logName + " logs for " + taskID + " from "
+          + oldLogFileDetail.length + "bytes to " + taskRetainSize
+          + "bytes.");
+      newLogFileDetail.length = taskRetainSize;
+    } else {
+      LOG.debug("No truncation needed for " + logName + " logs for " + taskID
+          + " length is " + oldLogFileDetail.length + " retain size "
+          + taskRetainSize + "bytes.");
+      newLogFileDetail.length = oldLogFileDetail.length;
+    }
+    long charsSkipped =
+        logFileReader.skip(oldLogFileDetail.length
+            - newLogFileDetail.length);
+    if (charsSkipped != oldLogFileDetail.length - newLogFileDetail.length) {
+      throw new IOException("Erroneously skipped " + charsSkipped
+          + " instead of the expected "
+          + (oldLogFileDetail.length - newLogFileDetail.length)
+          + " while truncating " + logName + " logs for " + taskID );
+    }
+    long alreadyRead = 0;
+    while (alreadyRead < newLogFileDetail.length) {
+      char tmpBuf[]; // Temporary buffer to read logs
+      if (newLogFileDetail.length - alreadyRead >= DEFAULT_BUFFER_SIZE) {
+        tmpBuf = new char[DEFAULT_BUFFER_SIZE];
+      } else {
+        tmpBuf = new char[(int) (newLogFileDetail.length - alreadyRead)];
+      }
+      int bytesRead = logFileReader.read(tmpBuf);
+      if (bytesRead < 0) {
+        break;
+      } else {
+        alreadyRead += bytesRead;
+      }
+      tmpFileWriter.write(tmpBuf);
+    }
+    // ////// End of truncating log file ///////////////////////
+
+    return newLogFileDetail;
+  }
+
+  /**
+   * Truncation of logs is done. Now sync the index files to reflect the
+   * truncated sizes.
+   * 
+   * @param firstAttempt
+   * @param updatedTaskLogFileDetails
+   */
+  private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt,
+      Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) {
+    for (Entry<Task, Map<LogName, LogFileDetail>> entry : 
+                                updatedTaskLogFileDetails.entrySet()) {
+      Task task = entry.getKey();
+      Map<LogName, LogFileDetail> logFileDetails = entry.getValue();
+      Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>();
+      // set current and previous lengths
+      for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) {
+        logLengths.put(logName, new Long[] { Long.valueOf(0L),
+            Long.valueOf(0L) });
+        LogFileDetail lfd = logFileDetails.get(logName);
+        if (lfd != null) {
+          // Set previous lengths
+          logLengths.get(logName)[0] = Long.valueOf(lfd.start);
+          // Set current lengths
+          logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length);
+        }
+      }
+      try {
+        TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
+            task.isTaskCleanupTask(), logLengths);
+      } catch (IOException ioe) {
+        LOG.warn("Exception encountered while updating index file of task "
+            + task.getTaskID()
+            + ". Ignoring and continuing with other tasks.", ioe);
+      }
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:03:23 2011
@@ -54,7 +54,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.*;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -128,11 +129,6 @@ public class TaskTracker 
   static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
      "mapred.tasktracker.pmem.reserved";
 
-  static final String MAP_USERLOG_RETAIN_SIZE =
-      "mapreduce.cluster.map.userlog.retain-size";
-  static final String REDUCE_USERLOG_RETAIN_SIZE =
-      "mapreduce.cluster.reduce.userlog.retain-size";
-
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
@@ -276,7 +272,7 @@ public class TaskTracker 
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
-  private TaskLogsMonitor taskLogsMonitor;
+  private UserLogManager userLogManager;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
@@ -456,12 +452,12 @@ public class TaskTracker 
     }
   }
 
-  TaskLogsMonitor getTaskLogsMonitor() {
-    return this.taskLogsMonitor;
+  UserLogManager getUserLogManager() {
+    return this.userLogManager;
   }
 
-  void setTaskLogsMonitor(TaskLogsMonitor t) {
-    this.taskLogsMonitor = t;
+  void setUserLogManager(UserLogManager u) {
+    this.userLogManager = u;
   }
 
   public static String getUserDir(String user) {
@@ -717,9 +713,7 @@ public class TaskTracker 
 
     initializeMemoryManagement();
 
-    setTaskLogsMonitor(new TaskLogsMonitor(getMapUserLogRetainSize(),
-        getReduceUserLogRetainSize()));
-    getTaskLogsMonitor().start();
+    getUserLogManager().clearOldUserLogs(fConf);
 
     setIndexCache(new IndexCache(this.fConf));
 
@@ -953,10 +947,8 @@ public class TaskTracker 
                               new LocalDirAllocator("mapred.local.dir");
 
   // intialize the job directory
-  @SuppressWarnings("unchecked")
-  private void localizeJob(TaskInProgress tip) 
+  RunningJob localizeJob(TaskInProgress tip) 
   throws IOException, InterruptedException {
-    Path localJarFile = null;
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
@@ -967,7 +959,8 @@ public class TaskTracker 
     synchronized (rjob) {
       if (!rjob.localized) {
         JobConf localJobConf = localizeJobFiles(t, rjob);
-        
+        // initialize job log directory
+        initializeJobLogDir(jobId);
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
         // should be the last call after every other directory/file to be
@@ -985,7 +978,7 @@ public class TaskTracker 
         rjob.localized = true;
       }
     }
-    launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
+    return rjob;
   }
 
   /**
@@ -1062,6 +1055,15 @@ public class TaskTracker 
     return localJobConf;
   }
 
+  // create job userlog dir
+  void initializeJobLogDir(JobID jobId) {
+    // remove it from tasklog cleanup thread first,
+    // it might be added there because of tasktracker reinit or restart
+    JobStartedEvent jse = new JobStartedEvent(jobId);
+    getUserLogManager().addLogEvent(jse);
+    localizer.initializeJobLogDir(jobId);
+  }
+
   /**
    * Download the job configuration file from the FS.
    *
@@ -1183,10 +1185,6 @@ public class TaskTracker 
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
 
-    // All tasks are killed. So, they are removed from TaskLog monitoring also.
-    // Interrupt the monitor.
-    getTaskLogsMonitor().interrupt();
-
     jvmManager.stop();
     
     // shutdown RPC connections
@@ -1261,7 +1259,8 @@ public class TaskTracker 
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
-    
+    // create user log manager
+    setUserLogManager(new UserLogManager(conf));
     // Initialize the jobACLSManager
     jobACLsManager = new TaskTrackerJobACLsManager(this);
     initialize();
@@ -1620,22 +1619,6 @@ public class TaskTracker 
     return heartbeatResponse;
   }
 
-  long getMapUserLogRetainSize() {
-    return fConf.getLong(MAP_USERLOG_RETAIN_SIZE, -1);
-  }
-
-  void setMapUserLogRetainSize(long retainSize) {
-    fConf.setLong(MAP_USERLOG_RETAIN_SIZE, retainSize);
-  }
-
-  long getReduceUserLogRetainSize() {
-    return fConf.getLong(REDUCE_USERLOG_RETAIN_SIZE, -1);
-  }
-
-  void setReduceUserLogRetainSize(long retainSize) {
-    fConf.setLong(REDUCE_USERLOG_RETAIN_SIZE, retainSize);
-  }
-
   /**
    * Return the total virtual memory available on this TaskTracker.
    * @return total size of virtual memory.
@@ -1778,7 +1761,7 @@ public class TaskTracker 
    * @param action The action with the job
    * @throws IOException
    */
-  private synchronized void purgeJob(KillJobAction action) throws IOException {
+  synchronized void purgeJob(KillJobAction action) throws IOException {
     JobID jobId = action.getJobID();
     LOG.info("Received 'KillJobAction' for job: " + jobId);
     RunningJob rjob = null;
@@ -1803,6 +1786,13 @@ public class TaskTracker 
         if (!rjob.keepJobFiles) {
           removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
         }
+        // add job to user log manager
+        long now = System.currentTimeMillis();
+        JobCompletedEvent jca = new JobCompletedEvent(rjob
+            .getJobID(), now, UserLogCleaner.getUserlogRetainHours(rjob
+            .getJobConf()));
+        getUserLogManager().addLogEvent(jca);
+
         // Remove this job 
         rjob.tasks.clear();
       }
@@ -2157,7 +2147,8 @@ public class TaskTracker 
    */
   void startNewTask(TaskInProgress tip) {
     try {
-      localizeJob(tip);
+      RunningJob rjob = localizeJob(tip);
+      launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2217,6 +2208,7 @@ public class TaskTracker 
    */
   public void run() {
     try {
+      getUserLogManager().start();
       startCleanupThreads();
       boolean denied = false;
       while (running && !shuttingDown && !denied) {
@@ -2711,8 +2703,9 @@ public class TaskTracker 
 
               // Debug-command is run. Do the post-debug-script-exit debug-logs
               // processing. Truncate the logs.
-              getTaskLogsMonitor().addProcessForLogTruncation(
-                  task.getTaskID(), Arrays.asList(task));
+              JvmFinishedEvent jvmFinished = new JvmFinishedEvent(
+                  new JVMInfo(task.getTaskID(), Arrays.asList(task)));
+              getUserLogManager().addLogEvent(jvmFinished);
             }
           }
           taskStatus.setProgress(0.0f);
@@ -3259,6 +3252,10 @@ public class TaskTracker 
     FetchStatus getFetchStatus() {
       return f;
     }
+
+    JobConf getJobConf() {
+      return jobConf;
+    }
   }
 
   /**

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+
+/**
+ * This is used only in UserLogManager, to manage cleanup of user logs.
+ */
+public class UserLogCleaner extends Thread {
+  private static final Log LOG = LogFactory.getLog(UserLogCleaner.class);
+  static final String USERLOGCLEANUP_SLEEPTIME = 
+    "mapreduce.tasktracker.userlogcleanup.sleeptime";
+  static final int DEFAULT_USER_LOG_RETAIN_HOURS = 24; // 1 day
+  static final long DEFAULT_THREAD_SLEEP_TIME = 1000 * 60 * 60; // 1 hour
+
+  private UserLogManager userLogManager;
+  private Map<JobID, Long> completedJobs = Collections
+      .synchronizedMap(new HashMap<JobID, Long>());
+  private final long threadSleepTime;
+  private CleanupQueue cleanupQueue;
+
+  private Clock clock;
+  private FileSystem localFs;
+
+  public UserLogCleaner(UserLogManager userLogManager, Configuration conf)
+      throws IOException {
+    this.userLogManager = userLogManager;
+    threadSleepTime = conf.getLong(USERLOGCLEANUP_SLEEPTIME,
+        DEFAULT_THREAD_SLEEP_TIME);
+    cleanupQueue = new CleanupQueue();
+    localFs = FileSystem.getLocal(conf);
+    setClock(new Clock());
+    setDaemon(true);
+  }
+
+  void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  Clock getClock() {
+    return this.clock;
+  }
+
+  CleanupQueue getCleanupQueue() {
+    return cleanupQueue;
+  }
+
+  void setCleanupQueue(CleanupQueue cleanupQueue) {
+    this.cleanupQueue = cleanupQueue;
+  }
+
+  @Override
+  public void run() {
+    // This thread wakes up after every threadSleepTime interval
+    // and deletes if there are any old logs.
+    while (true) {
+      try {
+        // sleep
+        Thread.sleep(threadSleepTime);
+        processCompletedJobs();
+      } catch (Throwable e) {
+        LOG.warn(getClass().getSimpleName()
+            + " encountered an exception while monitoring :", e);
+        LOG.info("Ingoring the exception and continuing monitoring.");
+      }
+    }
+  }
+
+  void processCompletedJobs() throws IOException {
+    long now = clock.getTime();
+    // iterate through completedJobs and remove old logs.
+    synchronized (completedJobs) {
+      Iterator<Entry<JobID, Long>> completedJobIter = completedJobs.entrySet()
+          .iterator();
+      while (completedJobIter.hasNext()) {
+        Entry<JobID, Long> entry = completedJobIter.next();
+        // see if the job is old enough
+        if (entry.getValue().longValue() <= now) {
+          // add the job for deletion
+          userLogManager.addLogEvent(new DeleteJobEvent(entry.getKey()));
+          completedJobIter.remove();
+        }
+      }
+    }
+  }
+
+  public void deleteJobLogs(JobID jobid) throws IOException {
+    deleteLogPath(TaskLog.getJobDir(jobid).getAbsolutePath());
+  }
+
+  /**
+   * Clears all the logs in userlog directory.
+   * 
+   * Adds the job directories for deletion with default retain hours. Deletes
+   * all other directories, if any. This is usually called on reinit/restart of
+   * the TaskTracker
+   * 
+   * @param conf
+   * @throws IOException
+   */
+  public void clearOldUserLogs(Configuration conf) throws IOException {
+    File userLogDir = TaskLog.getUserLogDir();
+    if (userLogDir.exists()) {
+      String[] logDirs = userLogDir.list();
+      if (logDirs.length > 0) {
+        // add all the log dirs to taskLogsMnonitor.
+        long now = clock.getTime();
+        for (String logDir : logDirs) {
+          JobID jobid = null;
+          try {
+            jobid = JobID.forName(logDir);
+          } catch (IllegalArgumentException ie) {
+            // if the directory is not a jobid, delete it immediately
+            deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+            continue;
+          }
+          // add the job log directory for deletion with default retain hours,
+          // if it is not already added
+          if (!completedJobs.containsKey(jobid)) {
+            JobCompletedEvent jce = new JobCompletedEvent(jobid, now,
+                getUserlogRetainHours(conf));
+            userLogManager.addLogEvent(jce);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * If the configuration is null or user-log retain hours is not configured,
+   * the retain hours are {@value UserLogCleaner#DEFAULT_USER_LOG_RETAIN_HOURS}
+   */
+  static int getUserlogRetainHours(Configuration conf) {
+    return (conf == null ? DEFAULT_USER_LOG_RETAIN_HOURS : conf.getInt(
+        JobContext.USER_LOG_RETAIN_HOURS, DEFAULT_USER_LOG_RETAIN_HOURS));
+  }
+
+  /**
+   * Adds job user-log directory to cleanup thread to delete logs after user-log
+   * retain hours.
+   * 
+   * @param jobCompletionTime
+   *          job completion time in millis
+   * @param retainHours
+   *          the user-log retain hours for the job
+   * @param jobid
+   *          JobID for which user logs should be deleted
+   */
+  public void markJobLogsForDeletion(long jobCompletionTime, int retainHours,
+      org.apache.hadoop.mapreduce.JobID jobid) {
+    long retainTimeStamp = jobCompletionTime + (retainHours * 1000L * 60L * 60L);
+    LOG.info("Adding " + jobid + " for user-log deletion with retainTimeStamp:"
+        + retainTimeStamp);
+    completedJobs.put(jobid, Long.valueOf(retainTimeStamp));
+  }
+
+  /**
+   * Remove job from user log deletion.
+   * 
+   * @param jobid
+   */
+  public void unmarkJobFromLogDeletion(JobID jobid) {
+    if (completedJobs.remove(jobid) != null) {
+      LOG.info("Removing " + jobid + " from user-log deletion");
+    }
+  }
+
+  /**
+   * Deletes the log path.
+   * 
+   * This path will be removed through {@link CleanupQueue}
+   * 
+   * @param logPath
+   * @throws IOException
+   */
+  private void deleteLogPath(String logPath) throws IOException {
+    LOG.info("Deleting user log path " + logPath);
+    PathDeletionContext context = new PathDeletionContext(localFs, logPath);
+    cleanupQueue.addToQueue(context);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar  4 04:03:23 2011
@@ -64,6 +64,8 @@ public class JobContext {
   
   public static final String JOB_CANCEL_DELEGATION_TOKEN = 
     "mapreduce.job.complete.cancel.delegation.tokens";
+  public static final String USER_LOG_RETAIN_HOURS = 
+    "mapred.userlog.retain.hours";
   
   protected UserGroupInformation ugi;
   

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker;
+
+import java.util.List;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class JVMInfo {
+  private TaskAttemptID firstAttemptID; 
+  private List<Task> allAttempts;
+
+  public JVMInfo(TaskAttemptID firstAttemptID, List<Task> allAttempts) {
+    this.firstAttemptID = firstAttemptID;
+    this.allAttempts = allAttempts;
+  }
+  
+  public TaskAttemptID getFirstAttemptID() {
+    return firstAttemptID;
+  }
+
+  public List<Task> getAllAttempts() {
+    return allAttempts;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar  4 04:03:23 2011
@@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.JobID;
 
 /**
  * 
@@ -358,4 +359,22 @@ public class Localizer {
           + attemptId.toString());
     }
   }
+
+  /**
+   * Create job log directory and set appropriate permissions for the directory.
+   * 
+   * @param jobId
+   */
+  public void initializeJobLogDir(JobID jobId) {
+    File jobUserLogDir = TaskLog.getJobDir(jobId);
+    if (!jobUserLogDir.exists()) {
+      boolean ret = jobUserLogDir.mkdirs();
+      if (!ret) {
+        LOG.warn("Could not create job user log directory: " + jobUserLogDir);
+        return;
+      }
+    }
+    Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
+        Localizer.PermissionsHandler.sevenZeroZero);
+  }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when job logs should be deleted.
+ */
+public class DeleteJobEvent extends UserLogEvent {
+  private JobID jobid;
+
+  /**
+   * Create the event to delete job log directory.
+   * 
+   * @param jobid
+   *          The {@link JobID} whose logs should be deleted.
+   */
+  public DeleteJobEvent(JobID jobid) {
+    super(EventType.DELETE_JOB);
+    this.jobid = jobid;
+  }
+
+  /**
+   * Get the jobid.
+   * 
+   * @return object of {@link JobID}
+   */
+  public JobID getJobID() {
+    return jobid;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when the job completes
+ */
+public class JobCompletedEvent extends UserLogEvent {
+  private JobID jobid;
+  private long jobCompletionTime;
+  private int retainHours;
+
+  /**
+   * Create the event for job completion.
+   * 
+   * @param jobid
+   *          The completed {@link JobID} .
+   * @param jobCompletionTime
+   *          The job completion time.
+   * @param retainHours
+   *          The number of hours for which the job logs should be retained
+   */
+  public JobCompletedEvent(JobID jobid, long jobCompletionTime,
+      int retainHours) {
+    super(EventType.JOB_COMPLETED);
+    this.jobid = jobid;
+    this.jobCompletionTime = jobCompletionTime;
+    this.retainHours = retainHours;
+  }
+
+  /**
+   * Get the job id.
+   * 
+   * @return object of {@link JobID}
+   */
+  public JobID getJobID() {
+    return jobid;
+  }
+
+  /**
+   * Get the job completion time-stamp in milli-seconds.
+   * 
+   * @return job completion time.
+   */
+  public long getJobCompletionTime() {
+    return jobCompletionTime;
+  }
+
+  /**
+   * Get the number of hours for which job logs should be retained.
+   * 
+   * @return retainHours
+   */
+  public int getRetainHours() {
+    return retainHours;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when the job starts.
+ */
+public class JobStartedEvent extends UserLogEvent {
+  private JobID jobid;
+
+  /**
+   * Create the event to inform the job has started.
+   * 
+   * @param jobid
+   *          The {@link JobID} which started
+   */
+  public JobStartedEvent(JobID jobid) {
+    super(EventType.JOB_STARTED);
+    this.jobid = jobid;
+  }
+
+  /**
+   * Get the job id.
+   * 
+   * @return object of {@link JobID}
+   */
+  public JobID getJobID() {
+    return jobid;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+
+/**
+ * This is an {@link UserLogEvent} sent when the jvm finishes.
+ */
+public class JvmFinishedEvent extends UserLogEvent {
+  private JVMInfo jvmInfo;
+
+  /**
+   * Create the event to inform that the jvm has finished.
+   * 
+   * @param jvmInfo
+   *          The finished {@link JVMInfo}
+   */
+  public JvmFinishedEvent(JVMInfo jvmInfo) {
+    super(EventType.JVM_FINISHED);
+    this.jvmInfo = jvmInfo;
+  }
+
+  /**
+   * Get the jvm info.
+   * 
+   * @return object of {@link JVMInfo}
+   */
+  public JVMInfo getJvmInfo() {
+    return jvmInfo;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * A directive from the various components of {@link TaskTracker} to the
+ * {@link UserLogManager} to inform about an event.
+ */
+public abstract class UserLogEvent {
+  
+  public enum EventType {
+    JVM_FINISHED,
+    JOB_STARTED,
+    JOB_COMPLETED,
+    DELETE_JOB,
+  };
+
+  private EventType eventType;
+  
+  protected UserLogEvent(EventType eventType) {
+    this.eventType = eventType;
+  }
+  
+  /**
+   * Return the {@link EventType}.
+   * @return the {@link EventType}.
+   */
+  public EventType getEventType() {
+    return eventType;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskLogsTruncater;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.UserLogCleaner;
+
+/**
+ * This manages user logs on the {@link TaskTracker}.
+ */
+public class UserLogManager {
+  private static final Log LOG = LogFactory.getLog(UserLogManager.class);
+  private BlockingQueue<UserLogEvent> userLogEvents = 
+    new LinkedBlockingQueue<UserLogEvent>();
+  private TaskLogsTruncater taskLogsTruncater;
+  private UserLogCleaner userLogCleaner;
+
+  private Thread monitorLogEvents = new Thread() {
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          monitor();
+        } catch (Exception e) {
+          LOG.warn("Exception while monitoring user log events", e);
+        }
+      }
+    }
+  };
+
+  /**
+   * Create the user log manager to manage user logs on {@link TaskTracker}.
+   * 
+   * It should be explicitly started using {@link #start()} to start functioning
+   * 
+   * @param conf
+   *          The {@link Configuration}
+   * 
+   * @throws IOException
+   */
+  public UserLogManager(Configuration conf) throws IOException {
+    taskLogsTruncater = new TaskLogsTruncater(conf);
+    userLogCleaner = new UserLogCleaner(this, conf);
+    monitorLogEvents.setDaemon(true);
+  }
+
+  /**
+   * Starts managing the logs
+   */
+  public void start() {
+    userLogCleaner.start();
+    monitorLogEvents.start();
+  }
+
+  protected void monitor() throws Exception {
+    UserLogEvent event = userLogEvents.take();
+    processEvent(event);
+  }
+
+  protected void processEvent(UserLogEvent event) throws IOException {
+    if (event instanceof JvmFinishedEvent) {
+      doJvmFinishedAction((JvmFinishedEvent) event);
+    } else if (event instanceof JobCompletedEvent) {
+      doJobCompletedAction((JobCompletedEvent) event);
+    } else if (event instanceof JobStartedEvent) {
+      doJobStartedAction((JobStartedEvent) event);
+    } else if (event instanceof DeleteJobEvent) {
+      doDeleteJobAction((DeleteJobEvent) event);
+    } else { 
+      LOG.warn("Unknown event " + event.getEventType() + " passed.");
+    }
+  }
+
+  /**
+   * Called during TaskTracker restart/re-init.
+   * 
+   * @param conf
+   *          TT's conf
+   * @throws IOException
+   */
+  public void clearOldUserLogs(Configuration conf)
+      throws IOException {
+    userLogCleaner.clearOldUserLogs(conf);
+  }
+
+  private void doJvmFinishedAction(JvmFinishedEvent event) {
+    taskLogsTruncater.truncateLogs(event.getJvmInfo());
+  }
+
+  private void doJobStartedAction(JobStartedEvent event) {
+    userLogCleaner.unmarkJobFromLogDeletion(event.getJobID());
+  }
+
+  private void doJobCompletedAction(JobCompletedEvent event) {
+    userLogCleaner.markJobLogsForDeletion(event.getJobCompletionTime(), event
+        .getRetainHours(), event.getJobID());
+  }
+
+  private void doDeleteJobAction(DeleteJobEvent event) throws IOException {
+    userLogCleaner.deleteJobLogs(event.getJobID());
+  }
+
+  /**
+   * Add the {@link UserLogEvent} for processing.
+   * 
+   * @param event
+   */
+  public void addLogEvent(UserLogEvent event) {
+    userLogEvents.add(event);
+  }
+
+  /**
+   * Get {@link UserLogCleaner}.
+   * 
+   * This method is called only from unit tests.
+   * 
+   * @return {@link UserLogCleaner}
+   */
+  public UserLogCleaner getUserLogCleaner() {
+    return userLogCleaner;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar  4 04:03:23 2011
@@ -77,9 +77,6 @@ public class TestLocalizationWithLinuxTa
     String user = ugi.split(",")[0];
     jobConf.setUser(user);
     File jobConfFile = uploadJobConf(jobConf);
-    // Create the task again to change the job-user
-    task =
-      new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
     task.setConf(jobConf);
     task.setUser(user);
     taskTrackerUserName = UserGroupInformation.getLoginUser()
@@ -208,6 +205,11 @@ public class TestLocalizationWithLinuxTa
       checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
           .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
     }
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
   }
 
   @Override



Mime
View raw message