Author: omalley
Date: Fri Mar 4 04:03:23 2011
New Revision: 1077322
URL: http://svn.apache.org/viewvc?rev=1077322&view=rev
Log:
commit 3d86e34d8f754bbda13cbaa6d18fe223d708a20a
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date: Wed Mar 17 11:37:43 2010 +0530
MAPREDUCE-927 from https://issues.apache.org/jira/secure/attachment/12439009/patch-927-5-dist.txt
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 04:03:23 2011
@@ -212,9 +212,17 @@ char *get_task_dir_path(const char *tt_r
/**
* Get the log directory for the given attempt.
*/
-char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
- return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
- attempt_id);
+char *get_task_log_dir(const char *log_dir, const char *job_id,
+ const char *attempt_id) {
+ return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 3, log_dir,
+ job_id, attempt_id);
+}
+
+/**
+ * Get the log directory for the given job.
+ */
+char *get_job_log_dir(const char *log_dir, const char *job_id) {
+ return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id);
}
/**
@@ -379,7 +387,8 @@ static int secure_path(const char *path,
continue;
}
- if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
+ if (should_check_ownership &&
+ (check_ownership(entry->fts_path, uid, gid) != 0)) {
fprintf(LOGFILE,
"Invalid file path. %s not user/group owned by the tasktracker.\n",
entry->fts_path);
@@ -498,15 +507,61 @@ int prepare_attempt_directories(const ch
}
/**
+ * Function to prepare the job log dir for the child. It gives the user
+ * ownership of the job's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ * * sudo chown user:mapred log-dir/userlogs/$jobid
+ * * sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
+ * * sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ */
+int prepare_job_logs(const char *log_dir, const char *job_id,
+ mode_t permissions) {
+
+ char *job_log_dir = get_job_log_dir(log_dir, job_id);
+ if (job_log_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't get job log directory %s.\n", job_log_dir);
+ return -1;
+ }
+
+ struct stat filestat;
+ if (stat(job_log_dir, &filestat) != 0) {
+ if (errno == ENOENT) {
+#ifdef DEBUG
+ fprintf(LOGFILE, "job_log_dir %s doesn't exist. Not doing anything.\n",
+ job_log_dir);
+#endif
+ free(job_log_dir);
+ return 0;
+ } else {
+ // stat failed because of something else!
+ fprintf(LOGFILE, "Failed to stat the job log dir %s\n", job_log_dir);
+ free(job_log_dir);
+ return -1;
+ }
+ }
+
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+ if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+ permissions, S_ISGID | permissions, 1) != 0) {
+ fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
+ free(job_log_dir);
+ return -1;
+ }
+ free(job_log_dir);
+ return 0;
+}
+
+/**
* Function to prepare the task logs for the child. It gives the user
* ownership of the attempt's log-dir to the user and group ownership to the
* user running tasktracker.
- * * sudo chown user:mapred log-dir/userlogs/$attemptid
- * * sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ * * sudo chown user:mapred log-dir/userlogs/$jobid/$attemptid
+ * * sudo chmod -R 2770 log-dir/userlogs/$jobid/$attemptid
*/
-int prepare_task_logs(const char *log_dir, const char *task_id) {
+int prepare_task_logs(const char *log_dir, const char *job_id,
+ const char *task_id) {
- char *task_log_dir = get_task_log_dir(log_dir, task_id);
+ char *task_log_dir = get_task_log_dir(log_dir, job_id, task_id);
if (task_log_dir == NULL) {
fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
return -1;
@@ -524,10 +579,12 @@ int prepare_task_logs(const char *log_di
fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
task_log_dir);
#endif
+ free(task_log_dir);
return 0;
} else {
// stat failed because of something else!
fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+ free(task_log_dir);
return -1;
}
}
@@ -537,8 +594,10 @@ int prepare_task_logs(const char *log_di
S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
// setgid on dirs but not files, 770. As of now, there are no files though
fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+ free(task_log_dir);
return -1;
}
+ free(task_log_dir);
return 0;
}
@@ -557,8 +616,9 @@ int get_user_details(const char *user) {
/*
* Function to check if the TaskTracker actually owns the file.
+ * Or it has right ownership already.
*/
-int check_ownership(char *path) {
+int check_ownership(char *path, uid_t uid, gid_t gid) {
struct stat filestat;
if (stat(path, &filestat) != 0) {
return UNABLE_TO_STAT_FILE;
@@ -566,8 +626,10 @@ int check_ownership(char *path) {
// check user/group. User should be TaskTracker user, group can either be
// TaskTracker's primary group or the special group to which binary's
// permissions are set.
- if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
- != filestat.st_gid)) {
+ // Or it can be the user/group owned by uid and gid passed.
+ if ((getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+ != filestat.st_gid)) &&
+ ((uid != filestat.st_uid) || (gid != filestat.st_gid))) {
return FILE_NOT_OWNED_BY_TASKTRACKER;
}
return 0;
@@ -668,10 +730,13 @@ int initialize_user(const char *user) {
* Function to prepare the job directories for the task JVM.
* We do the following:
* * sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid
+ * * sudo chown user:mapred -R logs/userlogs/$jobid
* * if user is not $tt_user,
* * sudo chmod 2570 -R taskTracker/$user/jobcache/$jobid
+ * * sudo chmod 2570 -R logs/userlogs/$jobid
* * else // user is tt_user
* * sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid
+ * * sudo chmod 2770 -R logs/userlogs/$jobid
* *
* * For any user, sudo chmod 2770 taskTracker/$user/jobcache/$jobid/work
*/
@@ -783,11 +848,32 @@ int initialize_job(const char *jobid, co
}
free(local_dir);
free(full_local_dir_str);
- cleanup();
+ int exit_code = 0;
if (failed) {
- return INITIALIZE_JOB_FAILED;
+ exit_code = INITIALIZE_JOB_FAILED;
+ goto cleanup;
}
- return 0;
+
+ char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+ if (log_dir == NULL) {
+ fprintf(LOGFILE, "Log directory is not configured.\n");
+ exit_code = INVALID_TT_LOG_DIR;
+ goto cleanup;
+ }
+
+ if (prepare_job_logs(log_dir, jobid, permissions) != 0) {
+ fprintf(LOGFILE, "Couldn't prepare job logs directory %s for %s.\n",
+ log_dir, jobid);
+ exit_code = PREPARE_JOB_LOGS_FAILED;
+ }
+
+ cleanup:
+ // free configurations
+ cleanup();
+ if (log_dir != NULL) {
+ free(log_dir);
+ }
+ return exit_code;
}
/**
@@ -891,7 +977,7 @@ int initialize_task(const char *jobid, c
goto cleanup;
}
- if (prepare_task_logs(log_dir, taskid) != 0) {
+ if (prepare_task_logs(log_dir, jobid, taskid) != 0) {
fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
log_dir, taskid);
exit_code = PREPARE_TASK_LOGS_FAILED;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar 4 04:03:23 2011
@@ -70,7 +70,8 @@ enum errorcodes {
INITIALIZE_DISTCACHEFILE_FAILED, //19
INITIALIZE_USER_FAILED, //20
UNABLE_TO_BUILD_PATH, //21
- INVALID_TASKCONTROLLER_PERMISSIONS //22
+ INVALID_TASKCONTROLLER_PERMISSIONS, //22
+ PREPARE_JOB_LOGS_FAILED, //23
};
#define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -83,7 +84,9 @@ enum errorcodes {
#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
-#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/tests/test-task-controller.c Fri Mar 4 04:03:23 2011
@@ -171,13 +171,26 @@ void test_get_task_launcher_file() {
assert(ret == 0);
}
+void test_get_job_log_dir() {
+ char *logdir = (char *) get_job_log_dir("/tmp/testing",
+ "job_200906101234_0001");
+ printf("logdir obtained is %s\n", logdir);
+ int ret = 0;
+ if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+ ret = -1;
+ }
+ free(logdir);
+ assert(ret == 0);
+}
+
void test_get_task_log_dir() {
char *logdir = (char *) get_task_log_dir("/tmp/testing",
- "attempt_200906112028_0001_m_000000_0");
+ "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
printf("logdir obtained is %s\n", logdir);
int ret = 0;
if (strcmp(logdir,
- "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+ "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+ != 0) {
ret = -1;
}
free(logdir);
@@ -203,6 +216,9 @@ int main(int argc, char **argv) {
printf("\nTesting get_task_launcher_file()\n");
test_get_task_launcher_file();
+ printf("\nTesting get_job_log_dir()\n");
+ test_get_job_log_dir();
+
printf("\nTesting get_task_log_dir()\n");
test_get_task_log_dir();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 04:03:23 2011
@@ -631,7 +631,7 @@
<name>mapred.userlog.retain.hours</name>
<value>24</value>
<description>The maximum time, in hours, for which the user-logs are to be
- retained.
+ retained after the job completion.
</description>
</property>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar 4 04:03:23 2011
@@ -193,7 +193,6 @@ class Child {
numTasksToExecute = job.getNumTasksToExecutePerJvm();
assert(numTasksToExecute != 0);
- TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
task.setConf(job);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 04:03:23 2011
@@ -32,6 +32,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -419,9 +421,7 @@ class JvmManager {
}
public void run() {
runChild(env);
-
- // Post-JVM-exit logs processing. Truncate the logs.
- truncateJVMLogs();
+ jvmFinished();
}
public void runChild(JvmEnv env) {
@@ -481,11 +481,12 @@ class JvmManager {
}
}
- // Post-JVM-exit logs processing. Truncate the logs.
- private void truncateJVMLogs() {
+ // Post-JVM-exit logs processing. inform user log manager
+ private void jvmFinished() {
Task firstTask = initalContext.task;
- tracker.getTaskLogsMonitor().addProcessForLogTruncation(
- firstTask.getTaskID(), tasksGiven);
+ JvmFinishedEvent jfe = new JvmFinishedEvent(new JVMInfo(firstTask
+ .getTaskID(), tasksGiven));
+ tracker.getUserLogManager().addLogEvent(jfe);
}
public void taskRan() {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar 4 04:03:23 2011
@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -42,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -71,7 +71,7 @@ public class TaskLog {
}
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
- return new File(getBaseDir(taskid.toString()), filter.toString());
+ return new File(getAttemptDir(taskid.toString()), filter.toString());
}
/**
@@ -94,7 +94,7 @@ public class TaskLog {
+ ie);
return null;
}
- return new File(getBaseDir(l.location), filter.toString());
+ return new File(getAttemptDir(l.location), filter.toString());
}
/**
@@ -108,7 +108,7 @@ public class TaskLog {
*/
static String getRealTaskLogFilePath(String location, LogName filter)
throws IOException {
- return FileUtil.makeShellPath(new File(getBaseDir(location),
+ return FileUtil.makeShellPath(new File(getAttemptDir(location),
filter.toString()));
}
@@ -144,7 +144,7 @@ public class TaskLog {
for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
LogFileDetail l = new LogFileDetail();
l.location = loc;
- l.length = new File(getBaseDir(l.location), filter.toString()).length();
+ l.length = new File(getAttemptDir(l.location), filter.toString()).length();
l.start = 0;
allLogsFileDetails.put(filter, l);
}
@@ -166,7 +166,7 @@ public class TaskLog {
}
private static File getTmpIndexFile(String taskid) {
- return new File(getBaseDir(taskid), "log.tmp");
+ return new File(getAttemptDir(taskid), "log.tmp");
}
public static File getIndexFile(String taskid) {
return getIndexFile(taskid, false);
@@ -174,9 +174,9 @@ public class TaskLog {
public static File getIndexFile(String taskid, boolean isCleanup) {
if (isCleanup) {
- return new File(getBaseDir(taskid), "log.index.cleanup");
+ return new File(getAttemptDir(taskid), "log.index.cleanup");
} else {
- return new File(getBaseDir(taskid), "log.index");
+ return new File(getAttemptDir(taskid), "log.index");
}
}
@@ -184,8 +184,9 @@ public class TaskLog {
return System.getProperty("hadoop.log.dir");
}
- static File getBaseDir(String taskid) {
- return new File(LOG_DIR, taskid);
+ static File getAttemptDir(String taskid) {
+ return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
+ taskid);
}
static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
@@ -317,39 +318,6 @@ public class TaskLog {
}
}
- private static class TaskLogsPurgeFilter implements FileFilter {
- long purgeTimeStamp;
-
- TaskLogsPurgeFilter(long purgeTimeStamp) {
- this.purgeTimeStamp = purgeTimeStamp;
- }
-
- public boolean accept(File file) {
- LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
- return file.lastModified() < purgeTimeStamp;
- }
- }
-
- /**
- * Purge old user logs.
- *
- * @throws IOException
- */
- public static synchronized void cleanup(int logsRetainHours
- ) throws IOException {
- // Purge logs of tasks on this tasktracker if their
- // mtime has exceeded "mapred.task.log.retain" hours
- long purgeTimeStamp = System.currentTimeMillis() -
- (logsRetainHours*60L*60*1000);
- File[] oldTaskLogs = LOG_DIR.listFiles
- (new TaskLogsPurgeFilter(purgeTimeStamp));
- if (oldTaskLogs != null) {
- for (int i=0; i < oldTaskLogs.length; ++i) {
- FileUtil.fullyDelete(oldTaskLogs[i]);
- }
- }
- }
-
static class Reader extends InputStream {
private long bytesRemaining;
private FileInputStream file;
@@ -390,7 +358,7 @@ public class TaskLog {
start += fileDetail.start;
end += fileDetail.start;
bytesRemaining = end - start;
- file = new FileInputStream(new File(getBaseDir(fileDetail.location),
+ file = new FileInputStream(new File(getAttemptDir(fileDetail.location),
kind.toString()));
// skip upto start
long pos = 0;
@@ -696,4 +664,14 @@ public class TaskLog {
return LOG_DIR;
}
+ /**
+ * Get the user log directory for the job jobid.
+ *
+ * @param jobid
+ * @return user log directory for the job
+ */
+ public static File getJobDir(JobID jobid) {
+ return new File(getUserLogDir(), jobid.toString());
+ }
+
} // TaskLog
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java Fri Mar 4 04:03:23 2011
@@ -141,7 +141,7 @@ public class TaskLogServlet extends Http
*/
static Configuration getConfFromJobACLsFile(String attemptIdStr) {
Configuration conf = new Configuration(false);
- conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+ conf.addResource(new Path(TaskLog.getAttemptDir(attemptIdStr).toString(),
TaskRunner.jobACLsFile));
return conf;
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+
+/**
+ * The class for truncating the user logs.
+ * Should be used only by {@link UserLogManager}.
+ *
+ */
+public class TaskLogsTruncater {
+ static final Log LOG = LogFactory.getLog(TaskLogsTruncater.class);
+
+ static final String MAP_USERLOG_RETAIN_SIZE =
+ "mapreduce.cluster.map.userlog.retain-size";
+ static final String REDUCE_USERLOG_RETAIN_SIZE =
+ "mapreduce.cluster.reduce.userlog.retain-size";
+ static final int DEFAULT_RETAIN_SIZE = -1;
+
+ long mapRetainSize, reduceRetainSize;
+
+ public TaskLogsTruncater(Configuration conf) {
+ mapRetainSize = conf.getLong(MAP_USERLOG_RETAIN_SIZE, DEFAULT_RETAIN_SIZE);
+ reduceRetainSize = conf.getLong(REDUCE_USERLOG_RETAIN_SIZE,
+ DEFAULT_RETAIN_SIZE);
+ LOG.info("Initializing logs' truncater with mapRetainSize=" + mapRetainSize
+ + " and reduceRetainSize=" + reduceRetainSize);
+
+ }
+
+ private static final int DEFAULT_BUFFER_SIZE = 4 * 1024;
+
+ static final int MINIMUM_RETAIN_SIZE_FOR_TRUNCATION = 0;
+
+ /**
+ * Process the removed task's logs. This involves truncating them to
+ * retainSize.
+ */
+ public void truncateLogs(JVMInfo lInfo) {
+ TaskAttemptID firstAttempt = TaskAttemptID.downgrade(lInfo
+ .getFirstAttemptID());
+
+ // Read the log-file details for all the attempts that ran in this JVM
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
+ try {
+ taskLogFileDetails = getAllLogsFileDetails(lInfo.getAllAttempts());
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception in truncateLogs while getting allLogsFileDetails()."
+ + " Ignoring the truncation of logs of this process.", e);
+ return;
+ }
+
+ // set this boolean to true if any of the log files is truncated
+ boolean indexModified = false;
+
+ Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails =
+ new HashMap<Task, Map<LogName, LogFileDetail>>();
+ // Make a copy of original indices into updated indices
+ for (LogName logName : LogName.values()) {
+ copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ }
+
+ File attemptLogDir = TaskLog.getAttemptDir(firstAttempt.toString());
+
+ FileWriter tmpFileWriter;
+ FileReader logFileReader;
+ // Now truncate file by file
+ logNameLoop: for (LogName logName : LogName.values()) {
+
+ File logFile = TaskLog.getTaskLogFile(firstAttempt, logName);
+
+ // //// Optimization: if no task is over limit, just skip truncation-code
+ if (logFile.exists()
+ && !isTruncationNeeded(lInfo, taskLogFileDetails, logName)) {
+ LOG.debug("Truncation is not needed for "
+ + logFile.getAbsolutePath());
+ continue;
+ }
+ // //// End of optimization
+
+ // Truncation is needed for this log-file. Go ahead now.
+ File tmpFile = new File(attemptLogDir, "truncate.tmp");
+ try {
+ tmpFileWriter = new FileWriter(tmpFile);
+ } catch (IOException ioe) {
+ LOG.warn("Cannot open " + tmpFile.getAbsolutePath()
+ + " for writing truncated log-file "
+ + logFile.getAbsolutePath()
+ + ". Continuing with other log files. ", ioe);
+ continue;
+ }
+
+ try {
+ logFileReader = new FileReader(logFile);
+ } catch (FileNotFoundException fe) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot open " + logFile.getAbsolutePath()
+ + " for reading. Continuing with other log files");
+ }
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ continue;
+ }
+
+ long newCurrentOffset = 0;
+ // Process each attempt from the ordered list passed.
+ for (Task task : lInfo.getAllAttempts()) {
+
+ // Truncate the log files of this task-attempt so that only the last
+ // retainSize many bytes of this log file is retained and the log
+ // file is reduced in size saving disk space.
+ long retainSize =
+ (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+ LogFileDetail newLogFileDetail = null;
+ try {
+ newLogFileDetail =
+ truncateALogFileOfAnAttempt(task.getTaskID(),
+ taskLogFileDetails.get(task).get(logName), retainSize,
+ tmpFileWriter, logFileReader, logName);
+ } catch (IOException ioe) {
+ LOG.warn("Cannot truncate the log file "
+ + logFile.getAbsolutePath()
+ + ". Caught exception while handling " + task.getTaskID(),
+ ioe);
+ // revert back updatedTaskLogFileDetails
+ copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ continue logNameLoop;
+ }
+
+ // Track information for updating the index file properly.
+ // Index files don't track DEBUGOUT and PROFILE logs, so skip'em.
+ if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+ if (!updatedTaskLogFileDetails.containsKey(task)) {
+ updatedTaskLogFileDetails.put(task,
+ new HashMap<LogName, LogFileDetail>());
+ }
+ // newLogFileDetail already has the location and length set, just
+ // set the start offset now.
+ newLogFileDetail.start = newCurrentOffset;
+ updatedTaskLogFileDetails.get(task).put(logName, newLogFileDetail);
+ newCurrentOffset += newLogFileDetail.length;
+ indexModified = true; // set the flag
+ }
+ }
+
+ try {
+ tmpFileWriter.close();
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't close the tmp file " + tmpFile.getAbsolutePath()
+ + ". Deleting it.", ioe);
+ copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ continue;
+ }
+
+ if (!tmpFile.renameTo(logFile)) {
+ // If the tmpFile cannot be renamed revert back
+ // updatedTaskLogFileDetails to maintain the consistency of the
+ // original log file
+ copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ }
+ }
+
+ if (indexModified) {
+ // Update the index files
+ updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+ }
+ }
+
+ /**
+ * @param lInfo
+ * @param taskLogFileDetails
+ * @param updatedTaskLogFileDetails
+ * @param logName
+ */
+ private void copyOriginalIndexFileInfo(JVMInfo lInfo,
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+ Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
+ LogName logName) {
+ if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+ for (Task task : lInfo.getAllAttempts()) {
+ if (!updatedTaskLogFileDetails.containsKey(task)) {
+ updatedTaskLogFileDetails.put(task,
+ new HashMap<LogName, LogFileDetail>());
+ }
+ updatedTaskLogFileDetails.get(task).put(logName,
+ taskLogFileDetails.get(task).get(logName));
+ }
+ }
+ }
+
+ /**
+ * Get the logFileDetails of all the list of attempts passed.
+ *
+ * @param lInfo
+ * @return a map of task to the log-file detail
+ * @throws IOException
+ */
+ private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails(
+ final List<Task> allAttempts) throws IOException {
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails =
+ new HashMap<Task, Map<LogName, LogFileDetail>>();
+ for (Task task : allAttempts) {
+ Map<LogName, LogFileDetail> allLogsFileDetails;
+ allLogsFileDetails =
+ TaskLog.getAllLogsFileDetails(task.getTaskID(),
+ task.isTaskCleanupTask());
+ taskLogFileDetails.put(task, allLogsFileDetails);
+ }
+ return taskLogFileDetails;
+ }
+
+ /**
+ * Check if truncation of logs is needed for the given jvmInfo. If all the
+ * tasks that ran in a JVM are within the log-limits, then truncation is not
+ * needed. Otherwise it is needed.
+ *
+ * @param lInfo
+ * @param taskLogFileDetails
+ * @param logName
+ * @return true if truncation is needed, false otherwise
+ */
+ private boolean isTruncationNeeded(JVMInfo lInfo,
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+ LogName logName) {
+ boolean truncationNeeded = false;
+ LogFileDetail logFileDetail = null;
+ for (Task task : lInfo.getAllAttempts()) {
+ long taskRetainSize =
+ (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+ Map<LogName, LogFileDetail> allLogsFileDetails =
+ taskLogFileDetails.get(task);
+ logFileDetail = allLogsFileDetails.get(logName);
+ if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+ && logFileDetail.length > taskRetainSize) {
+ truncationNeeded = true;
+ break;
+ }
+ }
+ return truncationNeeded;
+ }
+
+ /**
+ * Truncate the log file of this task-attempt so that only the last retainSize
+ * many bytes of each log file is retained and the log file is reduced in size
+ * saving disk space.
+ *
+ * @param taskID Task whose logs need to be truncated
+ * @param oldLogFileDetail contains the original log details for the attempt
+ * @param taskRetainSize retain-size
+ * @param tmpFileWriter New log file to write to. Already opened in append
+ * mode.
+ * @param logFileReader Original log file to read from.
+ * @return
+ * @throws IOException
+ */
+ private LogFileDetail truncateALogFileOfAnAttempt(
+ final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail,
+ final long taskRetainSize, final FileWriter tmpFileWriter,
+ final FileReader logFileReader,
+ final LogName logName) throws IOException {
+ LogFileDetail newLogFileDetail = new LogFileDetail();
+
+ // ///////////// Truncate log file ///////////////////////
+
+ // New location of log file is same as the old
+ newLogFileDetail.location = oldLogFileDetail.location;
+ if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+ && oldLogFileDetail.length > taskRetainSize) {
+ LOG.info("Truncating " + logName + " logs for " + taskID + " from "
+ + oldLogFileDetail.length + "bytes to " + taskRetainSize
+ + "bytes.");
+ newLogFileDetail.length = taskRetainSize;
+ } else {
+ LOG.debug("No truncation needed for " + logName + " logs for " + taskID
+ + " length is " + oldLogFileDetail.length + " retain size "
+ + taskRetainSize + "bytes.");
+ newLogFileDetail.length = oldLogFileDetail.length;
+ }
+ long charsSkipped =
+ logFileReader.skip(oldLogFileDetail.length
+ - newLogFileDetail.length);
+ if (charsSkipped != oldLogFileDetail.length - newLogFileDetail.length) {
+ throw new IOException("Erroneously skipped " + charsSkipped
+ + " instead of the expected "
+ + (oldLogFileDetail.length - newLogFileDetail.length)
+ + " while truncating " + logName + " logs for " + taskID );
+ }
+ long alreadyRead = 0;
+ while (alreadyRead < newLogFileDetail.length) {
+ char tmpBuf[]; // Temporary buffer to read logs
+ if (newLogFileDetail.length - alreadyRead >= DEFAULT_BUFFER_SIZE) {
+ tmpBuf = new char[DEFAULT_BUFFER_SIZE];
+ } else {
+ tmpBuf = new char[(int) (newLogFileDetail.length - alreadyRead)];
+ }
+ int bytesRead = logFileReader.read(tmpBuf);
+ if (bytesRead < 0) {
+ break;
+ } else {
+ alreadyRead += bytesRead;
+ }
+ tmpFileWriter.write(tmpBuf);
+ }
+ // ////// End of truncating log file ///////////////////////
+
+ return newLogFileDetail;
+ }
+
+ /**
+ * Truncation of logs is done. Now sync the index files to reflect the
+ * truncated sizes.
+ *
+ * @param firstAttempt
+ * @param updatedTaskLogFileDetails
+ */
+ private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt,
+ Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) {
+ for (Entry<Task, Map<LogName, LogFileDetail>> entry :
+ updatedTaskLogFileDetails.entrySet()) {
+ Task task = entry.getKey();
+ Map<LogName, LogFileDetail> logFileDetails = entry.getValue();
+ Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>();
+ // set current and previous lengths
+ for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) {
+ logLengths.put(logName, new Long[] { Long.valueOf(0L),
+ Long.valueOf(0L) });
+ LogFileDetail lfd = logFileDetails.get(logName);
+ if (lfd != null) {
+ // Set previous lengths
+ logLengths.get(logName)[0] = Long.valueOf(lfd.start);
+ // Set current lengths
+ logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length);
+ }
+ }
+ try {
+ TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
+ task.isTaskCleanupTask(), logLengths);
+ } catch (IOException ioe) {
+ LOG.warn("Exception encountered while updating index file of task "
+ + task.getTaskID()
+ + ". Ignoring and continuing with other tasks.", ioe);
+ }
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 04:03:23 2011
@@ -54,7 +54,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.*;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -128,11 +129,6 @@ public class TaskTracker
static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
"mapred.tasktracker.pmem.reserved";
- static final String MAP_USERLOG_RETAIN_SIZE =
- "mapreduce.cluster.map.userlog.retain-size";
- static final String REDUCE_USERLOG_RETAIN_SIZE =
- "mapreduce.cluster.reduce.userlog.retain-size";
-
static final long WAIT_FOR_DONE = 3 * 1000;
private int httpPort;
@@ -276,7 +272,7 @@ public class TaskTracker
private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
- private TaskLogsMonitor taskLogsMonitor;
+ private UserLogManager userLogManager;
static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
"mapred.tasktracker.memory_calculator_plugin";
@@ -456,12 +452,12 @@ public class TaskTracker
}
}
- TaskLogsMonitor getTaskLogsMonitor() {
- return this.taskLogsMonitor;
+ UserLogManager getUserLogManager() {
+ return this.userLogManager;
}
- void setTaskLogsMonitor(TaskLogsMonitor t) {
- this.taskLogsMonitor = t;
+ void setUserLogManager(UserLogManager u) {
+ this.userLogManager = u;
}
public static String getUserDir(String user) {
@@ -717,9 +713,7 @@ public class TaskTracker
initializeMemoryManagement();
- setTaskLogsMonitor(new TaskLogsMonitor(getMapUserLogRetainSize(),
- getReduceUserLogRetainSize()));
- getTaskLogsMonitor().start();
+ getUserLogManager().clearOldUserLogs(fConf);
setIndexCache(new IndexCache(this.fConf));
@@ -953,10 +947,8 @@ public class TaskTracker
new LocalDirAllocator("mapred.local.dir");
// intialize the job directory
- @SuppressWarnings("unchecked")
- private void localizeJob(TaskInProgress tip)
+ RunningJob localizeJob(TaskInProgress tip)
throws IOException, InterruptedException {
- Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
RunningJob rjob = addTaskToJob(jobId, tip);
@@ -967,7 +959,8 @@ public class TaskTracker
synchronized (rjob) {
if (!rjob.localized) {
JobConf localJobConf = localizeJobFiles(t, rjob);
-
+ // initialize job log directory
+ initializeJobLogDir(jobId);
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir. Note that initializeJob
// should be the last call after every other directory/file to be
@@ -985,7 +978,7 @@ public class TaskTracker
rjob.localized = true;
}
}
- launchTaskForJob(tip, new JobConf(rjob.jobConf));
+ return rjob;
}
/**
@@ -1062,6 +1055,15 @@ public class TaskTracker
return localJobConf;
}
+ // create job userlog dir
+ void initializeJobLogDir(JobID jobId) {
+ // remove it from tasklog cleanup thread first,
+ // it might be added there because of tasktracker reinit or restart
+ JobStartedEvent jse = new JobStartedEvent(jobId);
+ getUserLogManager().addLogEvent(jse);
+ localizer.initializeJobLogDir(jobId);
+ }
+
/**
* Download the job configuration file from the FS.
*
@@ -1183,10 +1185,6 @@ public class TaskTracker
this.mapLauncher.interrupt();
this.reduceLauncher.interrupt();
- // All tasks are killed. So, they are removed from TaskLog monitoring also.
- // Interrupt the monitor.
- getTaskLogsMonitor().interrupt();
-
jvmManager.stop();
// shutdown RPC connections
@@ -1261,7 +1259,8 @@ public class TaskTracker
server.start();
this.httpPort = server.getPort();
checkJettyPort(httpPort);
-
+ // create user log manager
+ setUserLogManager(new UserLogManager(conf));
// Initialize the jobACLSManager
jobACLsManager = new TaskTrackerJobACLsManager(this);
initialize();
@@ -1620,22 +1619,6 @@ public class TaskTracker
return heartbeatResponse;
}
- long getMapUserLogRetainSize() {
- return fConf.getLong(MAP_USERLOG_RETAIN_SIZE, -1);
- }
-
- void setMapUserLogRetainSize(long retainSize) {
- fConf.setLong(MAP_USERLOG_RETAIN_SIZE, retainSize);
- }
-
- long getReduceUserLogRetainSize() {
- return fConf.getLong(REDUCE_USERLOG_RETAIN_SIZE, -1);
- }
-
- void setReduceUserLogRetainSize(long retainSize) {
- fConf.setLong(REDUCE_USERLOG_RETAIN_SIZE, retainSize);
- }
-
/**
* Return the total virtual memory available on this TaskTracker.
* @return total size of virtual memory.
@@ -1778,7 +1761,7 @@ public class TaskTracker
* @param action The action with the job
* @throws IOException
*/
- private synchronized void purgeJob(KillJobAction action) throws IOException {
+ synchronized void purgeJob(KillJobAction action) throws IOException {
JobID jobId = action.getJobID();
LOG.info("Received 'KillJobAction' for job: " + jobId);
RunningJob rjob = null;
@@ -1803,6 +1786,13 @@ public class TaskTracker
if (!rjob.keepJobFiles) {
removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
}
+ // add job to user log manager
+ long now = System.currentTimeMillis();
+ JobCompletedEvent jca = new JobCompletedEvent(rjob
+ .getJobID(), now, UserLogCleaner.getUserlogRetainHours(rjob
+ .getJobConf()));
+ getUserLogManager().addLogEvent(jca);
+
// Remove this job
rjob.tasks.clear();
}
@@ -2157,7 +2147,8 @@ public class TaskTracker
*/
void startNewTask(TaskInProgress tip) {
try {
- localizeJob(tip);
+ RunningJob rjob = localizeJob(tip);
+ launchTaskForJob(tip, new JobConf(rjob.jobConf));
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -2217,6 +2208,7 @@ public class TaskTracker
*/
public void run() {
try {
+ getUserLogManager().start();
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
@@ -2711,8 +2703,9 @@ public class TaskTracker
// Debug-command is run. Do the post-debug-script-exit debug-logs
// processing. Truncate the logs.
- getTaskLogsMonitor().addProcessForLogTruncation(
- task.getTaskID(), Arrays.asList(task));
+ JvmFinishedEvent jvmFinished = new JvmFinishedEvent(
+ new JVMInfo(task.getTaskID(), Arrays.asList(task)));
+ getUserLogManager().addLogEvent(jvmFinished);
}
}
taskStatus.setProgress(0.0f);
@@ -3259,6 +3252,10 @@ public class TaskTracker
FetchStatus getFetchStatus() {
return f;
}
+
+ JobConf getJobConf() {
+ return jobConf;
+ }
}
/**
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+
+/**
+ * This is used only in UserLogManager, to manage cleanup of user logs.
+ */
+public class UserLogCleaner extends Thread {
+ private static final Log LOG = LogFactory.getLog(UserLogCleaner.class);
+ static final String USERLOGCLEANUP_SLEEPTIME =
+ "mapreduce.tasktracker.userlogcleanup.sleeptime";
+ static final int DEFAULT_USER_LOG_RETAIN_HOURS = 24; // 1 day
+ static final long DEFAULT_THREAD_SLEEP_TIME = 1000 * 60 * 60; // 1 hour
+
+ private UserLogManager userLogManager;
+ private Map<JobID, Long> completedJobs = Collections
+ .synchronizedMap(new HashMap<JobID, Long>());
+ private final long threadSleepTime;
+ private CleanupQueue cleanupQueue;
+
+ private Clock clock;
+ private FileSystem localFs;
+
+ public UserLogCleaner(UserLogManager userLogManager, Configuration conf)
+ throws IOException {
+ this.userLogManager = userLogManager;
+ threadSleepTime = conf.getLong(USERLOGCLEANUP_SLEEPTIME,
+ DEFAULT_THREAD_SLEEP_TIME);
+ cleanupQueue = new CleanupQueue();
+ localFs = FileSystem.getLocal(conf);
+ setClock(new Clock());
+ setDaemon(true);
+ }
+
+ void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ Clock getClock() {
+ return this.clock;
+ }
+
+ CleanupQueue getCleanupQueue() {
+ return cleanupQueue;
+ }
+
+ void setCleanupQueue(CleanupQueue cleanupQueue) {
+ this.cleanupQueue = cleanupQueue;
+ }
+
+ @Override
+ public void run() {
+ // This thread wakes up after every threadSleepTime interval
+ // and deletes if there are any old logs.
+ while (true) {
+ try {
+ // sleep
+ Thread.sleep(threadSleepTime);
+ processCompletedJobs();
+ } catch (Throwable e) {
+ LOG.warn(getClass().getSimpleName()
+ + " encountered an exception while monitoring :", e);
+ LOG.info("Ingoring the exception and continuing monitoring.");
+ }
+ }
+ }
+
+ void processCompletedJobs() throws IOException {
+ long now = clock.getTime();
+ // iterate through completedJobs and remove old logs.
+ synchronized (completedJobs) {
+ Iterator<Entry<JobID, Long>> completedJobIter = completedJobs.entrySet()
+ .iterator();
+ while (completedJobIter.hasNext()) {
+ Entry<JobID, Long> entry = completedJobIter.next();
+ // see if the job is old enough
+ if (entry.getValue().longValue() <= now) {
+ // add the job for deletion
+ userLogManager.addLogEvent(new DeleteJobEvent(entry.getKey()));
+ completedJobIter.remove();
+ }
+ }
+ }
+ }
+
+ public void deleteJobLogs(JobID jobid) throws IOException {
+ deleteLogPath(TaskLog.getJobDir(jobid).getAbsolutePath());
+ }
+
+ /**
+ * Clears all the logs in userlog directory.
+ *
+ * Adds the job directories for deletion with default retain hours. Deletes
+ * all other directories, if any. This is usually called on reinit/restart of
+ * the TaskTracker
+ *
+ * @param conf
+ * @throws IOException
+ */
+ public void clearOldUserLogs(Configuration conf) throws IOException {
+ File userLogDir = TaskLog.getUserLogDir();
+ if (userLogDir.exists()) {
+ String[] logDirs = userLogDir.list();
+ if (logDirs.length > 0) {
+ // add all the log dirs to taskLogsMnonitor.
+ long now = clock.getTime();
+ for (String logDir : logDirs) {
+ JobID jobid = null;
+ try {
+ jobid = JobID.forName(logDir);
+ } catch (IllegalArgumentException ie) {
+ // if the directory is not a jobid, delete it immediately
+ deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+ continue;
+ }
+ // add the job log directory for deletion with default retain hours,
+ // if it is not already added
+ if (!completedJobs.containsKey(jobid)) {
+ JobCompletedEvent jce = new JobCompletedEvent(jobid, now,
+ getUserlogRetainHours(conf));
+ userLogManager.addLogEvent(jce);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * If the configuration is null or user-log retain hours is not configured,
+ * the retain hours are {@value UserLogCleaner#DEFAULT_USER_LOG_RETAIN_HOURS}
+ */
+ static int getUserlogRetainHours(Configuration conf) {
+ return (conf == null ? DEFAULT_USER_LOG_RETAIN_HOURS : conf.getInt(
+ JobContext.USER_LOG_RETAIN_HOURS, DEFAULT_USER_LOG_RETAIN_HOURS));
+ }
+
+ /**
+ * Adds job user-log directory to cleanup thread to delete logs after user-log
+ * retain hours.
+ *
+ * @param jobCompletionTime
+ * job completion time in millis
+ * @param retainHours
+ * the user-log retain hours for the job
+ * @param jobid
+ * JobID for which user logs should be deleted
+ */
+ public void markJobLogsForDeletion(long jobCompletionTime, int retainHours,
+ org.apache.hadoop.mapreduce.JobID jobid) {
+ long retainTimeStamp = jobCompletionTime + (retainHours * 1000L * 60L * 60L);
+ LOG.info("Adding " + jobid + " for user-log deletion with retainTimeStamp:"
+ + retainTimeStamp);
+ completedJobs.put(jobid, Long.valueOf(retainTimeStamp));
+ }
+
+ /**
+ * Remove job from user log deletion.
+ *
+ * @param jobid
+ */
+ public void unmarkJobFromLogDeletion(JobID jobid) {
+ if (completedJobs.remove(jobid) != null) {
+ LOG.info("Removing " + jobid + " from user-log deletion");
+ }
+ }
+
+ /**
+ * Deletes the log path.
+ *
+ * This path will be removed through {@link CleanupQueue}
+ *
+ * @param logPath
+ * @throws IOException
+ */
+ private void deleteLogPath(String logPath) throws IOException {
+ LOG.info("Deleting user log path " + logPath);
+ PathDeletionContext context = new PathDeletionContext(localFs, logPath);
+ cleanupQueue.addToQueue(context);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar 4 04:03:23 2011
@@ -64,6 +64,8 @@ public class JobContext {
public static final String JOB_CANCEL_DELEGATION_TOKEN =
"mapreduce.job.complete.cancel.delegation.tokens";
+ public static final String USER_LOG_RETAIN_HOURS =
+ "mapred.userlog.retain.hours";
protected UserGroupInformation ugi;
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker;
+
+import java.util.List;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class JVMInfo {
+ private TaskAttemptID firstAttemptID;
+ private List<Task> allAttempts;
+
+ public JVMInfo(TaskAttemptID firstAttemptID, List<Task> allAttempts) {
+ this.firstAttemptID = firstAttemptID;
+ this.allAttempts = allAttempts;
+ }
+
+ public TaskAttemptID getFirstAttemptID() {
+ return firstAttemptID;
+ }
+
+ public List<Task> getAllAttempts() {
+ return allAttempts;
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 04:03:23 2011
@@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.JobID;
/**
*
@@ -358,4 +359,22 @@ public class Localizer {
+ attemptId.toString());
}
}
+
+ /**
+ * Create job log directory and set appropriate permissions for the directory.
+ *
+ * @param jobId
+ */
+ public void initializeJobLogDir(JobID jobId) {
+ File jobUserLogDir = TaskLog.getJobDir(jobId);
+ if (!jobUserLogDir.exists()) {
+ boolean ret = jobUserLogDir.mkdirs();
+ if (!ret) {
+ LOG.warn("Could not create job user log directory: " + jobUserLogDir);
+ return;
+ }
+ }
+ Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
+ Localizer.PermissionsHandler.sevenZeroZero);
+ }
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/DeleteJobEvent.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when job logs should be deleted.
+ */
+public class DeleteJobEvent extends UserLogEvent {
+ private JobID jobid;
+
+ /**
+ * Create the event to delete job log directory.
+ *
+ * @param jobid
+ * The {@link JobID} whose logs should be deleted.
+ */
+ public DeleteJobEvent(JobID jobid) {
+ super(EventType.DELETE_JOB);
+ this.jobid = jobid;
+ }
+
+ /**
+ * Get the jobid.
+ *
+ * @return object of {@link JobID}
+ */
+ public JobID getJobID() {
+ return jobid;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobCompletedEvent.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when the job completes
+ */
+public class JobCompletedEvent extends UserLogEvent {
+ private JobID jobid;
+ private long jobCompletionTime;
+ private int retainHours;
+
+ /**
+ * Create the event for job completion.
+ *
+ * @param jobid
+ * The completed {@link JobID} .
+ * @param jobCompletionTime
+ * The job completion time.
+ * @param retainHours
+ * The number of hours for which the job logs should be retained
+ */
+ public JobCompletedEvent(JobID jobid, long jobCompletionTime,
+ int retainHours) {
+ super(EventType.JOB_COMPLETED);
+ this.jobid = jobid;
+ this.jobCompletionTime = jobCompletionTime;
+ this.retainHours = retainHours;
+ }
+
+ /**
+ * Get the job id.
+ *
+ * @return object of {@link JobID}
+ */
+ public JobID getJobID() {
+ return jobid;
+ }
+
+ /**
+ * Get the job completion time-stamp in milli-seconds.
+ *
+ * @return job completion time.
+ */
+ public long getJobCompletionTime() {
+ return jobCompletionTime;
+ }
+
+ /**
+ * Get the number of hours for which job logs should be retained.
+ *
+ * @return retainHours
+ */
+ public int getRetainHours() {
+ return retainHours;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JobStartedEvent.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This is an {@link UserLogEvent} sent when the job starts.
+ */
+public class JobStartedEvent extends UserLogEvent {
+ private JobID jobid;
+
+ /**
+ * Create the event to inform the job has started.
+ *
+ * @param jobid
+ * The {@link JobID} which started
+ */
+ public JobStartedEvent(JobID jobid) {
+ super(EventType.JOB_STARTED);
+ this.jobid = jobid;
+ }
+
+ /**
+ * Get the job id.
+ *
+ * @return object of {@link JobID}
+ */
+ public JobID getJobID() {
+ return jobid;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/JvmFinishedEvent.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+
+/**
+ * This is an {@link UserLogEvent} sent when the jvm finishes.
+ */
+public class JvmFinishedEvent extends UserLogEvent {
+ private JVMInfo jvmInfo;
+
+ /**
+ * Create the event to inform that the jvm has finished.
+ *
+ * @param jvmInfo
+ * The finished {@link JVMInfo}
+ */
+ public JvmFinishedEvent(JVMInfo jvmInfo) {
+ super(EventType.JVM_FINISHED);
+ this.jvmInfo = jvmInfo;
+ }
+
+ /**
+ * Get the jvm info.
+ *
+ * @return object of {@link JVMInfo}
+ */
+ public JVMInfo getJvmInfo() {
+ return jvmInfo;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogEvent.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * A directive from the various components of {@link TaskTracker} to the
+ * {@link UserLogManager} to inform about an event.
+ */
+public abstract class UserLogEvent {
+
+ public enum EventType {
+ JVM_FINISHED,
+ JOB_STARTED,
+ JOB_COMPLETED,
+ DELETE_JOB,
+ };
+
+ private EventType eventType;
+
+ protected UserLogEvent(EventType eventType) {
+ this.eventType = eventType;
+ }
+
+ /**
+ * Return the {@link EventType}.
+ * @return the {@link EventType}.
+ */
+ public EventType getEventType() {
+ return eventType;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker.userlogs;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskLogsTruncater;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.UserLogCleaner;
+
+/**
+ * This manages user logs on the {@link TaskTracker}.
+ */
+public class UserLogManager {
+ private static final Log LOG = LogFactory.getLog(UserLogManager.class);
+ private BlockingQueue<UserLogEvent> userLogEvents =
+ new LinkedBlockingQueue<UserLogEvent>();
+ private TaskLogsTruncater taskLogsTruncater;
+ private UserLogCleaner userLogCleaner;
+
+ private Thread monitorLogEvents = new Thread() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ monitor();
+ } catch (Exception e) {
+ LOG.warn("Exception while monitoring user log events", e);
+ }
+ }
+ }
+ };
+
+ /**
+ * Create the user log manager to manage user logs on {@link TaskTracker}.
+ *
+ * It should be explicitly started using {@link #start()} to start functioning
+ *
+ * @param conf
+ * The {@link Configuration}
+ *
+ * @throws IOException
+ */
+ public UserLogManager(Configuration conf) throws IOException {
+ taskLogsTruncater = new TaskLogsTruncater(conf);
+ userLogCleaner = new UserLogCleaner(this, conf);
+ monitorLogEvents.setDaemon(true);
+ }
+
+ /**
+ * Starts managing the logs
+ */
+ public void start() {
+ userLogCleaner.start();
+ monitorLogEvents.start();
+ }
+
+ protected void monitor() throws Exception {
+ UserLogEvent event = userLogEvents.take();
+ processEvent(event);
+ }
+
+ protected void processEvent(UserLogEvent event) throws IOException {
+ if (event instanceof JvmFinishedEvent) {
+ doJvmFinishedAction((JvmFinishedEvent) event);
+ } else if (event instanceof JobCompletedEvent) {
+ doJobCompletedAction((JobCompletedEvent) event);
+ } else if (event instanceof JobStartedEvent) {
+ doJobStartedAction((JobStartedEvent) event);
+ } else if (event instanceof DeleteJobEvent) {
+ doDeleteJobAction((DeleteJobEvent) event);
+ } else {
+ LOG.warn("Unknown event " + event.getEventType() + " passed.");
+ }
+ }
+
+ /**
+ * Called during TaskTracker restart/re-init.
+ *
+ * @param conf
+ * TT's conf
+ * @throws IOException
+ */
+ public void clearOldUserLogs(Configuration conf)
+ throws IOException {
+ userLogCleaner.clearOldUserLogs(conf);
+ }
+
+ private void doJvmFinishedAction(JvmFinishedEvent event) {
+ taskLogsTruncater.truncateLogs(event.getJvmInfo());
+ }
+
+ private void doJobStartedAction(JobStartedEvent event) {
+ userLogCleaner.unmarkJobFromLogDeletion(event.getJobID());
+ }
+
+ private void doJobCompletedAction(JobCompletedEvent event) {
+ userLogCleaner.markJobLogsForDeletion(event.getJobCompletionTime(), event
+ .getRetainHours(), event.getJobID());
+ }
+
+ private void doDeleteJobAction(DeleteJobEvent event) throws IOException {
+ userLogCleaner.deleteJobLogs(event.getJobID());
+ }
+
+ /**
+ * Add the {@link UserLogEvent} for processing.
+ *
+ * @param event
+ */
+ public void addLogEvent(UserLogEvent event) {
+ userLogEvents.add(event);
+ }
+
+ /**
+ * Get {@link UserLogCleaner}.
+ *
+ * This method is called only from unit tests.
+ *
+ * @return {@link UserLogCleaner}
+ */
+ public UserLogCleaner getUserLogCleaner() {
+ return userLogCleaner;
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 04:03:23 2011
@@ -77,9 +77,6 @@ public class TestLocalizationWithLinuxTa
String user = ugi.split(",")[0];
jobConf.setUser(user);
File jobConfFile = uploadJobConf(jobConf);
- // Create the task again to change the job-user
- task =
- new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
task.setConf(jobConf);
task.setUser(user);
taskTrackerUserName = UserGroupInformation.getLoginUser()
@@ -208,6 +205,11 @@ public class TestLocalizationWithLinuxTa
checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
+
+ // check job user-log directory permissions
+ File jobLogDir = TaskLog.getJobDir(jobId);
+ checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
@Override
|