Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 32666 invoked from network); 11 May 2010 14:33:23 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 11 May 2010 14:33:23 -0000 Received: (qmail 70057 invoked by uid 500); 11 May 2010 14:33:23 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 69992 invoked by uid 500); 11 May 2010 14:33:23 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 69984 invoked by uid 99); 11 May 2010 14:33:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 May 2010 14:33:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 May 2010 14:33:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C72C62388906; Tue, 11 May 2010 14:32:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r943124 - in /hadoop/mapreduce/branches/branch-0.21: ./ conf/ src/c++/task-controller/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/pipes/ src/test/mapred/o... Date: Tue, 11 May 2010 14:32:51 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100511143251.C72C62388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Tue May 11 14:32:50 2010 New Revision: 943124 URL: http://svn.apache.org/viewvc?rev=943124&view=rev Log: MAPREDUCE-1607. Merge revision 943039 from trunk. Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java - copied unchanged from r943039, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue May 11 14:32:50 2010 @@ -52,6 +52,9 @@ Release 0.21.0 - Unreleased MAPREDUCE-1644. Remove Sqoop contrib module. (Aaron Kimball via cdouglas) + MAPREDUCE-1607. Task controller may not set permissions for a + task cleanup attempt's log directory (Amareshwari Sriramadasu via vinodkv) + NEW FEATURES MAPREDUCE-706. Support for FIFO pools in the fair scheduler. Modified: hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties (original) +++ hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties Tue May 11 14:32:50 2010 @@ -55,6 +55,7 @@ log4j.appender.console.layout.Conversion #Default values hadoop.tasklog.taskid=null +hadoop.tasklog.iscleanup=false hadoop.tasklog.noKeepSplits=4 hadoop.tasklog.totalLogFileSize=100 hadoop.tasklog.purgeLogSplits=true @@ -62,6 +63,7 @@ hadoop.tasklog.logsRetainHours=12 log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender log4j.appender.TLA.taskId=${hadoop.tasklog.taskid} +log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup} log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize} log4j.appender.TLA.layout=org.apache.log4j.PatternLayout Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/task-controller.c?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c (original) +++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c Tue May 11 14:32:50 2010 @@ -583,10 +583,6 @@ int prepare_task_logs(const char *log_di if (stat(task_log_dir, &filestat) != 0) { if (errno == ENOENT) { // See TaskRunner.java to see that an absent log-dir doesn't fail the task. - // Task log dir for cleanup tasks will not have the name - // task-attempt-id.cleanup. Instead a log.index.cleanup is created in - // task-attempt log dir. We check if the directory exists and return if - // it doesn't. So the following will work for cleanup attempts too. #ifdef DEBUG fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n", task_log_dir); Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Tue May 11 14:32:50 2010 @@ -135,6 +135,7 @@ public class TestStreamingTaskLog { long logSize = USERLOG_LIMIT_KB * 1024; assertTrue("environment set for child is wrong", env.contains("INFO,TLA") && env.contains("-Dhadoop.tasklog.taskid=attempt_") - && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize)); + && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize) + && env.contains("-Dhadoop.tasklog.iscleanup=false")); } } Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java Tue May 11 14:32:50 2010 @@ -69,8 +69,9 @@ class Child { int port = Integer.parseInt(args[1]); final InetSocketAddress address = new InetSocketAddress(host, port); final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]); + final String logLocation = args[3]; final int SLEEP_LONGER_COUNT = 5; - int jvmIdInt = Integer.parseInt(args[3]); + int jvmIdInt = Integer.parseInt(args[4]); JVMId jvmId = new JVMId(firstTaskid.getJobID(), firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt); @@ -109,7 +110,7 @@ class Child { public void run() { try { if (taskid != null) { - TaskLog.syncLogs(firstTaskid, taskid, isCleanup); + TaskLog.syncLogs(logLocation, taskid, isCleanup); } } catch (Throwable throwable) { } @@ -123,7 +124,7 @@ class Child { try { Thread.sleep(5000); if (taskid != null) { - TaskLog.syncLogs(firstTaskid, taskid, isCleanup); + TaskLog.syncLogs(logLocation, taskid, isCleanup); } } catch (InterruptedException ie) { } catch (IOException iee) { @@ -175,7 +176,7 @@ class Child { //create the index file so that the log files //are viewable immediately - TaskLog.syncLogs(firstTaskid, taskid, isCleanup); + TaskLog.syncLogs(logLocation, taskid, isCleanup); final JobConf job = new JobConf(task.getJobFile()); // set the jobTokenFile into task @@ -215,7 +216,7 @@ class Child { FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); taskFinal.run(job, umbilical); // run the task } finally { - TaskLog.syncLogs(firstTaskid, taskid, isCleanup); + TaskLog.syncLogs(logLocation, taskid, isCleanup); } return null; Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java Tue May 11 14:32:50 2010 @@ -72,19 +72,21 @@ public class TaskLog { } } - public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) { - return new File(getAttemptDir(taskid.toString()), filter.toString()); - } - public static File getRealTaskLogFileLocation(TaskAttemptID taskid, + public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup, LogName filter) { + return new File(getAttemptDir(taskid, isCleanup), filter.toString()); + } + + static File getRealTaskLogFileLocation(TaskAttemptID taskid, + boolean isCleanup, LogName filter) { LogFileDetail l; try { - l = getTaskLogFileDetail(taskid, filter); + l = getLogFileDetail(taskid, filter, isCleanup); } catch (IOException ie) { LOG.error("getTaskLogFileDetail threw an exception " + ie); return null; } - return new File(getAttemptDir(l.location), filter.toString()); + return new File(l.location, filter.toString()); } private static class LogFileDetail { final static String LOCATION = "LOG_DIR:"; @@ -93,16 +95,11 @@ public class TaskLog { long length; } - private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid, - LogName filter) throws IOException { - return getLogFileDetail(taskid, filter, false); - } - private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, LogName filter, boolean isCleanup) throws IOException { - File indexFile = getIndexFile(taskid.toString(), isCleanup); + File indexFile = getIndexFile(taskid, isCleanup); BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile)); //the format of the index file is //LOG_DIR: @@ -120,7 +117,7 @@ public class TaskLog { //to be associated with each task attempt since jvm reuse is disabled //when profiling/debugging is enabled if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) { - l.length = new File(getAttemptDir(l.location), filter.toString()).length(); + l.length = new File(l.location, filter.toString()).length(); l.start = 0; fis.close(); return l; @@ -141,39 +138,32 @@ public class TaskLog { return l; } - private static File getTmpIndexFile(String taskid) { - return new File(getAttemptDir(taskid), "log.tmp"); - } - public static File getIndexFile(String taskid) { - return getIndexFile(taskid, false); + private static File getTmpIndexFile(TaskAttemptID taskid, boolean isCleanup) { + return new File(getAttemptDir(taskid, isCleanup), "log.tmp"); } - - public static File getIndexFile(String taskid, boolean isCleanup) { - if (isCleanup) { - return new File(getAttemptDir(taskid), "log.index.cleanup"); - } else { - return new File(getAttemptDir(taskid), "log.index"); - } + + static File getIndexFile(TaskAttemptID taskid, boolean isCleanup) { + return new File(getAttemptDir(taskid, isCleanup), "log.index"); } static String getBaseLogDir() { return System.getProperty("hadoop.log.dir"); } - static File getAttemptDir(String taskid) { - return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()), - taskid); + static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) { + String cleanupSuffix = isCleanup ? ".cleanup" : ""; + return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix); } private static long prevOutLength; private static long prevErrLength; private static long prevLogLength; - private static void writeToIndexFile(TaskAttemptID firstTaskid, + private static void writeToIndexFile(String logLocation, boolean isCleanup) throws IOException { // To ensure atomicity of updates to index file, write to temporary index // file first and then rename. - File tmpIndexFile = getTmpIndexFile(currentTaskid.toString()); + File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup); BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false)); @@ -183,20 +173,23 @@ public class TaskLog { //STDOUT: //STDERR: //SYSLOG: - dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+ - LogName.STDOUT.toString()+":"); - dos.writeBytes(Long.toString(prevOutLength)+" "); - dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT) - .length() - prevOutLength)+"\n"+LogName.STDERR+":"); - dos.writeBytes(Long.toString(prevErrLength)+" "); - dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR) - .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":"); - dos.writeBytes(Long.toString(prevLogLength)+" "); - dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG) - .length() - prevLogLength)+"\n"); + dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n" + + LogName.STDOUT.toString() + ":"); + dos.writeBytes(Long.toString(prevOutLength) + " "); + dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT + .toString()).length() - prevOutLength) + + "\n" + LogName.STDERR + ":"); + dos.writeBytes(Long.toString(prevErrLength) + " "); + dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR + .toString()).length() - prevErrLength) + + "\n" + LogName.SYSLOG.toString() + ":"); + dos.writeBytes(Long.toString(prevLogLength) + " "); + dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG + .toString()).length() - prevLogLength) + + "\n"); dos.close(); - File indexFile = getIndexFile(currentTaskid.toString(), isCleanup); + File indexFile = getIndexFile(currentTaskid, isCleanup); Path indexFilePath = new Path(indexFile.getAbsolutePath()); Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath()); @@ -205,21 +198,15 @@ public class TaskLog { } localFS.rename (tmpIndexFilePath, indexFilePath); } - private static void resetPrevLengths(TaskAttemptID firstTaskid) { - prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length(); - prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length(); - prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length(); + private static void resetPrevLengths(String logLocation) { + prevOutLength = new File(logLocation, LogName.STDOUT.toString()).length(); + prevErrLength = new File(logLocation, LogName.STDERR.toString()).length(); + prevLogLength = new File(logLocation, LogName.SYSLOG.toString()).length(); } private volatile static TaskAttemptID currentTaskid = null; - public synchronized static void syncLogs(TaskAttemptID firstTaskid, - TaskAttemptID taskid) - throws IOException { - syncLogs(firstTaskid, taskid, false); - } - @SuppressWarnings("unchecked") - public synchronized static void syncLogs(TaskAttemptID firstTaskid, + public synchronized static void syncLogs(String logLocation, TaskAttemptID taskid, boolean isCleanup) throws IOException { @@ -238,9 +225,9 @@ public class TaskLog { } if (currentTaskid != taskid) { currentTaskid = taskid; - resetPrevLengths(firstTaskid); + resetPrevLengths(logLocation); } - writeToIndexFile(firstTaskid, isCleanup); + writeToIndexFile(logLocation, isCleanup); } /** @@ -274,15 +261,10 @@ public class TaskLog { } } - static class Reader extends InputStream { + public static class Reader extends InputStream { private long bytesRemaining; private FileInputStream file; - public Reader(TaskAttemptID taskid, LogName kind, - long start, long end) throws IOException { - this(taskid, kind, start, end, false); - } - /** * Read a log file from start to end positions. The offsets may be negative, * in which case they are relative to the end of the file. For example, @@ -312,8 +294,7 @@ public class TaskLog { start += fileDetail.start; end += fileDetail.start; bytesRemaining = end - start; - file = new FileInputStream(new File(getAttemptDir(fileDetail.location), - kind.toString())); + file = new FileInputStream(new File(fileDetail.location, kind.toString())); // skip upto start long pos = 0; while (pos < start) { Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Tue May 11 14:32:50 2010 @@ -34,6 +34,7 @@ public class TaskLogAppender extends Fil //so that log4j can configure it from the configuration(log4j.properties). private int maxEvents; private Queue tail = null; + private boolean isCleanup; @Override public void activateOptions() { @@ -41,8 +42,8 @@ public class TaskLogAppender extends Fil if (maxEvents > 0) { tail = new LinkedList(); } - setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId), - TaskLog.LogName.SYSLOG).toString()); + setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId), + isCleanup, TaskLog.LogName.SYSLOG).toString()); setAppend(true); super.activateOptions(); } @@ -98,4 +99,22 @@ public class TaskLogAppender extends Fil maxEvents = (int) logSize / EVENT_SIZE; } + /** + * Set whether the task is a cleanup attempt or not. + * + * @param isCleanup + * true if the task is cleanup attempt, false otherwise. + */ + public void setIsCleanup(boolean isCleanup) { + this.isCleanup = isCleanup; + } + + /** + * Get whether task is cleanup attempt or not. + * + * @return true if the task is cleanup attempt, false otherwise. + */ + public boolean getIsCleanup() { + return isCleanup; + } } Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Tue May 11 14:32:50 2010 @@ -43,8 +43,9 @@ import org.apache.hadoop.util.StringUtil public class TaskLogServlet extends HttpServlet { private static final long serialVersionUID = -6615764817774487321L; - private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) { - File f = TaskLog.getTaskLogFile(taskId, type); + private boolean haveTaskLog(TaskAttemptID taskId, boolean isCleanup, + TaskLog.LogName type) { + File f = TaskLog.getTaskLogFile(taskId, isCleanup, type); return f.canRead(); } @@ -145,9 +146,10 @@ public class TaskLogServlet extends Http * viewing task logs of old jobs(i.e. jobs finished on earlier unsecure * cluster). */ - static Configuration getConfFromJobACLsFile(String attemptIdStr) { + static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId, + boolean isCleanup) { Path jobAclsFilePath = new Path( - TaskLog.getAttemptDir(attemptIdStr).toString(), TaskRunner.jobACLsFile); + TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile); Configuration conf = null; if (new File(jobAclsFilePath.toUri().getPath()).exists()) { conf = new Configuration(false); @@ -176,38 +178,6 @@ public class TaskLogServlet extends Http return; } - TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr); - if (!TaskLog.getAttemptDir(attemptIdStr).exists()) { - response.sendError(HttpServletResponse.SC_GONE, - "Task log directory for task " + attemptId + - " does not exist. May be cleaned up by Task Tracker, if older logs."); - return; - } - - // get user name who is accessing - String user = request.getRemoteUser(); - if (user != null) { - ServletContext context = getServletContext(); - TaskTracker taskTracker = (TaskTracker) context.getAttribute( - "task.tracker"); - // get jobACLConf from ACLs file - Configuration jobACLConf = getConfFromJobACLsFile(attemptIdStr); - // Ignore authorization if job-acls.xml is not found - if (jobACLConf != null) { - JobID jobId = attemptId.getJobID(); - - try { - checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId, - taskTracker); - } catch (AccessControlException e) { - String errMsg = "User " + user + " failed to view tasklogs of job " + - jobId + "!\n\n" + e.getMessage(); - response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg); - return; - } - } - } - String logFilter = request.getParameter("filter"); if (logFilter != null) { try { @@ -240,6 +210,38 @@ public class TaskLogServlet extends Http isCleanup = Boolean.valueOf(sCleanup); } + TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr); + if (!TaskLog.getAttemptDir(attemptId, isCleanup).exists()) { + response.sendError(HttpServletResponse.SC_GONE, + "Task log directory for task " + attemptId + + " does not exist. May be cleaned up by Task Tracker, if older logs."); + return; + } + + // get user name who is accessing + String user = request.getRemoteUser(); + if (user != null) { + ServletContext context = getServletContext(); + TaskTracker taskTracker = (TaskTracker) context.getAttribute( + "task.tracker"); + // get jobACLConf from ACLs file + Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup); + // Ignore authorization if job-acls.xml is not found + if (jobACLConf != null) { + JobID jobId = attemptId.getJobID(); + + try { + checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId, + taskTracker); + } catch (AccessControlException e) { + String errMsg = "User " + user + " failed to view tasklogs of job " + + jobId + "!\n\n" + e.getMessage(); + response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg); + return; + } + } + } + OutputStream out = response.getOutputStream(); if( !plainText ) { out.write(("\n" + @@ -254,11 +256,11 @@ public class TaskLogServlet extends Http TaskLog.LogName.STDERR, isCleanup); printTaskLog(response, out, attemptId, start, end, plainText, TaskLog.LogName.SYSLOG, isCleanup); - if (haveTaskLog(attemptId, TaskLog.LogName.DEBUGOUT)) { + if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.DEBUGOUT)) { printTaskLog(response, out, attemptId, start, end, plainText, TaskLog.LogName.DEBUGOUT, isCleanup); } - if (haveTaskLog(attemptId, TaskLog.LogName.PROFILE)) { + if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.PROFILE)) { printTaskLog(response, out, attemptId, start, end, plainText, TaskLog.LogName.PROFILE, isCleanup); } Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue May 11 14:32:50 2010 @@ -216,7 +216,7 @@ abstract class TaskRunner extends Thread List setup = getVMSetupCmd(); // Set up the redirection of the task's stdout and stderr streams - File[] logFiles = prepareLogFiles(taskid); + File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask()); File stdout = logFiles[0]; File stderr = logFiles[1]; tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, @@ -286,13 +286,17 @@ abstract class TaskRunner extends Thread * Prepare the log files for the task * * @param taskid + * @param isCleanup * @return an array of files. The first file is stdout, the second is stderr. * @throws IOException */ - File[] prepareLogFiles(TaskAttemptID taskid) throws IOException { + File[] prepareLogFiles(TaskAttemptID taskid, boolean isCleanup) + throws IOException { File[] logFiles = new File[2]; - logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); - logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); + logFiles[0] = TaskLog.getTaskLogFile(taskid, isCleanup, + TaskLog.LogName.STDOUT); + logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup, + TaskLog.LogName.STDERR); File logDir = logFiles[0].getParentFile(); boolean b = logDir.mkdirs(); if (!b) { @@ -456,17 +460,13 @@ abstract class TaskRunner extends Thread vargs.add(classPath); // Setup the log4j prop - vargs.add("-Dhadoop.log.dir=" + - new File(System.getProperty("hadoop.log.dir") - ).getAbsolutePath()); - vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA"); - vargs.add("-Dhadoop.tasklog.taskid=" + taskid); - vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); + setupLog4jProperties(vargs, taskid, logSize); if (conf.getProfileEnabled()) { if (conf.getProfileTaskRange(t.isMapTask() ).isIncluded(t.getPartition())) { - File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE); + File prof = TaskLog.getTaskLogFile(taskid, t.isTaskCleanupTask(), + TaskLog.LogName.PROFILE); vargs.add(String.format(conf.getProfileParams(), prof.toString())); } } @@ -478,9 +478,21 @@ abstract class TaskRunner extends Thread vargs.add(address.getAddress().getHostAddress()); vargs.add(Integer.toString(address.getPort())); vargs.add(taskid.toString()); // pass task identifier + // pass task log location + vargs.add(TaskLog.getAttemptDir(taskid, t.isTaskCleanupTask()).toString()); return vargs; } + private void setupLog4jProperties(Vector vargs, TaskAttemptID taskid, + long logSize) { + vargs.add("-Dhadoop.log.dir=" + + new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()); + vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA"); + vargs.add("-Dhadoop.tasklog.taskid=" + taskid); + vargs.add("-Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()); + vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); + } + /** * @param taskid * @param workDir @@ -563,6 +575,7 @@ abstract class TaskRunner extends Thread hadoopClientOpts = hadoopClientOpts + " "; } hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid + + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask() + " -Dhadoop.tasklog.totalLogFileSize=" + logSize; env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue May 11 14:32:50 2010 @@ -2744,17 +2744,17 @@ public class TaskTracker String jobConf = task.getJobFile(); try { // get task's stdout file - taskStdout = FileUtil.makeShellPath( - TaskLog.getRealTaskLogFileLocation - (task.getTaskID(), TaskLog.LogName.STDOUT)); - // get task's stderr file - taskStderr = FileUtil.makeShellPath( - TaskLog.getRealTaskLogFileLocation - (task.getTaskID(), TaskLog.LogName.STDERR)); - // get task's syslog file - taskSyslog = FileUtil.makeShellPath( - TaskLog.getRealTaskLogFileLocation - (task.getTaskID(), TaskLog.LogName.SYSLOG)); + taskStdout = FileUtil + .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(), + task.isTaskCleanupTask(), TaskLog.LogName.STDOUT)); + // get task's stderr file + taskStderr = FileUtil + .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(), + task.isTaskCleanupTask(), TaskLog.LogName.STDERR)); + // get task's syslog file + taskSyslog = FileUtil + .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(), + task.isTaskCleanupTask(), TaskLog.LogName.SYSLOG)); } catch(IOException e){ LOG.warn("Exception finding task's stdout/err/syslog files"); } @@ -2773,8 +2773,8 @@ public class TaskTracker StringUtils.stringifyException(e)); } // Build the command - File stdout = TaskLog.getRealTaskLogFileLocation( - task.getTaskID(), TaskLog.LogName.DEBUGOUT); + File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task + .isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT); // add pipes program as argument if it exists. String program =""; String executable = Submitter.getExecutable(localJobConf); Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java Tue May 11 14:32:50 2010 @@ -97,10 +97,12 @@ class Application { @@ -48,6 +49,8 @@ public class TestTaskFail extends TestCa OutputCollector output, Reporter reporter) throws IOException { System.err.println(taskLog); + assertFalse(Boolean.getBoolean(System + .getProperty("hadoop.tasklog.iscleanup"))); if (taskid.endsWith("_0")) { throw new IOException(); } else if (taskid.endsWith("_1")) { @@ -61,6 +64,15 @@ public class TestTaskFail extends TestCa static class CommitterWithLogs extends FileOutputCommitter { public void abortTask(TaskAttemptContext context) throws IOException { System.err.println(cleanupLog); + String attemptId = System.getProperty("hadoop.tasklog.taskid"); + assertNotNull(attemptId); + if (attemptId.endsWith("_0")) { + assertFalse(Boolean.getBoolean(System + .getProperty("hadoop.tasklog.iscleanup"))); + } else { + assertTrue(Boolean.getBoolean(System + .getProperty("hadoop.tasklog.iscleanup"))); + } super.abortTask(context); } } @@ -113,65 +125,44 @@ public class TestTaskFail extends TestCa } private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, - TaskStatus ts, boolean isCleanup) + TaskStatus ts, boolean isCleanup, JobTracker jt) throws IOException { assertEquals(isCleanup, tip.isCleanupAttempt(attemptId)); assertTrue(ts != null); assertEquals(TaskStatus.State.FAILED, ts.getRunState()); // validate tasklogs for task attempt - String log = readTaskLog( + String log = MapReduceTestUtil.readTaskLog( TaskLog.LogName.STDERR, attemptId, false); assertTrue(log.contains(taskLog)); + // access the logs from web url + TaskTrackerStatus ttStatus = jt.getTaskTracker( + tip.machineWhereTaskRan(attemptId)).getStatus(); + String tasklogUrl = TaskLogServlet.getTaskLogUrl("localhost", + String.valueOf(ttStatus.getHttpPort()), attemptId.toString()) + + "&filter=STDERR"; + assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization + .getHttpStatusCode(tasklogUrl, tip.getUser(), "GET")); if (!isCleanup) { // validate task logs: tasklog should contain both task logs // and cleanup logs assertTrue(log.contains(cleanupLog)); } else { // validate tasklogs for cleanup attempt - log = readTaskLog( + log = MapReduceTestUtil.readTaskLog( TaskLog.LogName.STDERR, attemptId, true); assertTrue(log.contains(cleanupLog)); + // access the cleanup attempt's logs from web url + ttStatus = jt.getTaskTracker(tip.machineWhereCleanupRan(attemptId)) + .getStatus(); + String cleanupTasklogUrl = TaskLogServlet.getTaskLogUrl("localhost", + String.valueOf(ttStatus.getHttpPort()), attemptId.toString()) + + "&filter=STDERR&cleanup=true"; + assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization + .getHttpStatusCode(cleanupTasklogUrl, tip.getUser(), "GET")); } } - /** - * Reads tasklog and returns it as string after trimming it. - * @param filter Task log filter; can be STDOUT, STDERR, - * SYSLOG, DEBUGOUT, DEBUGERR - * @param taskId The task id for which the log has to collected - * @param isCleanup whether the task is a cleanup attempt or not. - * @return task log as string - * @throws IOException - */ - private String readTaskLog(TaskLog.LogName filter, - TaskAttemptID taskId, - boolean isCleanup) - throws IOException { - // string buffer to store task log - StringBuffer result = new StringBuffer(); - int res; - - // reads the whole tasklog into inputstream - InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup); - // construct string log from inputstream. - byte[] b = new byte[65536]; - while (true) { - res = taskLogReader.read(b); - if (res > 0) { - result.append(new String(b)); - } else { - break; - } - } - taskLogReader.close(); - - // trim the string and return it - String str = result.toString(); - str = str.trim(); - return str; - } - - private void validateJob(RunningJob job, MiniMRCluster mr) + private void validateJob(RunningJob job, JobTracker jt) throws IOException { assertEquals(JobStatus.SUCCEEDED, job.getJobState()); @@ -181,23 +172,21 @@ public class TestTaskFail extends TestCa // fails with an exception TaskAttemptID attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0); - TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker(). - getTip(attemptId.getTaskID()); - TaskStatus ts = - mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); - validateAttempt(tip, attemptId, ts, false); + TaskInProgress tip = jt.getTip(attemptId.getTaskID()); + TaskStatus ts = jt.getTaskStatus(attemptId); + validateAttempt(tip, attemptId, ts, false, jt); attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1); // this should be cleanup attempt since the second attempt fails // with System.exit - ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); - validateAttempt(tip, attemptId, ts, true); + ts = jt.getTaskStatus(attemptId); + validateAttempt(tip, attemptId, ts, true, jt); attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2); // this should be cleanup attempt since the third attempt fails // with Error - ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); - validateAttempt(tip, attemptId, ts, true); + ts = jt.getTaskStatus(attemptId); + validateAttempt(tip, attemptId, ts, true, jt); } public void testWithDFS() throws IOException { @@ -211,6 +200,7 @@ public class TestTaskFail extends TestCa dfs = new MiniDFSCluster(conf, 4, true, null); fileSys = dfs.getFileSystem(); mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); final Path inDir = new Path("./input"); final Path outDir = new Path("./output"); String input = "The quick brown fox\nhas many silly\nred fox sox\n"; @@ -222,18 +212,18 @@ public class TestTaskFail extends TestCa jobConf.setOutputCommitter(CommitterWithLogs.class); RunningJob rJob = launchJob(jobConf, inDir, outDir, input); rJob.waitForCompletion(); - validateJob(rJob, mr); + validateJob(rJob, jt); // launch job with fail tasks and fail-cleanups fileSys.delete(outDir, true); jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class); rJob = launchJob(jobConf, inDir, outDir, input); rJob.waitForCompletion(); - validateJob(rJob, mr); + validateJob(rJob, jt); fileSys.delete(outDir, true); jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class); rJob = launchJob(jobConf, inDir, outDir, input); rJob.waitForCompletion(); - validateJob(rJob, mr); + validateJob(rJob, jt); } finally { if (dfs != null) { dfs.shutdown(); } if (mr != null) { mr.shutdown(); } Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue May 11 14:32:50 2010 @@ -576,7 +576,8 @@ public class TestTaskTrackerLocalization runner.setupChildTaskConfiguration(lDirAlloc); TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()), localizedJobConf); - attemptLogFiles = runner.prepareLogFiles(task.getTaskID()); + attemptLogFiles = runner.prepareLogFiles(task.getTaskID(), + task.isTaskCleanupTask()); // Make sure the task-conf file is created Path localTaskFile = @@ -626,7 +627,7 @@ public class TestTaskTrackerLocalization .getPath(), "tmp").exists()); // Make sure that the logs are setup properly - File logDir = TaskLog.getAttemptDir(taskId.toString()); + File logDir = TaskLog.getAttemptDir(taskId, task.isTaskCleanupTask()); assertTrue("task's log dir " + logDir.toString() + " doesn't exist!", logDir.exists()); checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task @@ -655,8 +656,8 @@ public class TestTaskTrackerLocalization taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]); // Validate the contents of jobACLsFile(both user name and job-view-acls) - Configuration jobACLsConf = - TaskLogServlet.getConfFromJobACLsFile(task.getTaskID().toString()); + Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(task + .getTaskID(), task.isTaskCleanupTask()); assertTrue(jobACLsConf.get(MRJobConfig.USER_NAME).equals( localizedJobConf.getUser())); assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB). @@ -695,7 +696,7 @@ public class TestTaskTrackerLocalization * $taskid/work * Also see createFileAndSetPermissions for details */ - void validateRemoveFiles(boolean needCleanup, boolean jvmReuse, + void validateRemoveTaskFiles(boolean needCleanup, boolean jvmReuse, TaskInProgress tip) throws IOException { // create files and set permissions 555. Verify if task controller sets // the permissions for TT to delete the taskDir or workDir @@ -733,7 +734,6 @@ public class TestTaskTrackerLocalization // now try to delete the work dir and verify that there are no stale paths JvmManager.deleteWorkDir(tracker, task); } - tracker.removeJobFiles(task.getUser(), jobId); assertTrue("Some task files are not deleted!! Number of stale paths is " + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); @@ -743,42 +743,51 @@ public class TestTaskTrackerLocalization * Validates if task cleanup is done properly for a succeeded task * @throws IOException */ - public void testTaskCleanup() + public void testTaskFilesRemoval() throws Exception { if (!canRun()) { return; } - testTaskCleanup(false, false);// no needCleanup; no jvmReuse + testTaskFilesRemoval(false, false);// no needCleanup; no jvmReuse } /** * Validates if task cleanup is done properly for a task that is not succeeded * @throws IOException */ - public void testFailedTaskCleanup() + public void testFailedTaskFilesRemoval() throws Exception { if (!canRun()) { return; } - testTaskCleanup(true, false);// needCleanup; no jvmReuse + testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse + + // initialize a cleanupAttempt for the task. + task.setTaskCleanupTask(); + // localize task cleanup attempt + initializeTask(); + checkTaskLocalization(); + + // verify the cleanup of cleanup attempt. + testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse } /** * Validates if task cleanup is done properly for a succeeded task * @throws IOException */ - public void testTaskCleanupWithJvmUse() + public void testTaskFilesRemovalWithJvmUse() throws Exception { if (!canRun()) { return; } - testTaskCleanup(false, true);// no needCleanup; jvmReuse + testTaskFilesRemoval(false, true);// no needCleanup; jvmReuse } /** * Validates if task cleanup is done properly */ - private void testTaskCleanup(boolean needCleanup, boolean jvmReuse) + private void testTaskFilesRemoval(boolean needCleanup, boolean jvmReuse) throws Exception { // Localize job and localize task. TaskTracker.RunningJob rjob = tracker.localizeJob(tip); @@ -792,19 +801,7 @@ public class TestTaskTrackerLocalization // create files and set permissions 555. Verify if task controller sets // the permissions for TT to delete the task dir or work dir properly - validateRemoveFiles(needCleanup, jvmReuse, tip); - - // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still - // there. - for (String localDir : localDirs) { - Path userDir = - new Path(localDir, TaskTracker.getUserDir(task.getUser())); - assertTrue("User directory " + userDir + " is not present!!", - tracker.getLocalFileSystem().exists(userDir)); - } - - // Test userlogs cleanup. - verifyUserLogsCleanup(); + validateRemoveTaskFiles(needCleanup, jvmReuse, tip); } /** @@ -812,7 +809,7 @@ public class TestTaskTrackerLocalization * * @throws IOException */ - private void verifyUserLogsCleanup() + private void verifyUserLogsRemoval() throws IOException { // verify user logs cleanup File jobUserLogDir = TaskLog.getJobDir(jobId); @@ -832,7 +829,7 @@ public class TestTaskTrackerLocalization * - create files with no write permissions to TT under job-work-dir * - create files with no write permissions to TT under task-work-dir */ - public void testJobCleanup() throws IOException, InterruptedException { + public void testJobFilesRemoval() throws IOException, InterruptedException { if (!canRun()) { return; } @@ -899,6 +896,17 @@ public class TestTaskTrackerLocalization } assertFalse("Job " + task.getJobID() + " work dir exists after cleanup", jWorkDirExists); + // Test userlogs cleanup. + verifyUserLogsRemoval(); + + // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still + // there. + for (String localDir : localDirs) { + Path userDir = + new Path(localDir, TaskTracker.getUserDir(task.getUser())); + assertTrue("User directory " + userDir + " is not present!!", + tracker.getLocalFileSystem().exists(userDir)); + } } /** @@ -983,4 +991,30 @@ public class TestTaskTrackerLocalization checkTaskLocalization(); } + /** + * Localizes a cleanup task and validates permissions. + * + * @throws InterruptedException + * @throws IOException + */ + public void testCleanupTaskLocalization() throws IOException, + InterruptedException { + if (!canRun()) { + return; + } + + task.setTaskCleanupTask(); + // register task + tip = tracker.new TaskInProgress(task, trackerFConf); + + // localize the job. + RunningJob rjob = tracker.localizeJob(tip); + localizedJobConf = rjob.getJobConf(); + checkJobLocalization(); + + // localize task cleanup attempt + initializeTask(); + checkTaskLocalization(); + + } } Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java Tue May 11 14:32:50 2010 @@ -341,8 +341,9 @@ public class TestWebUIAuthorization exte // delete job-acls.xml file from the task log dir of attempt and verify // if unauthorized users can view task logs of attempt. - Path jobACLsFilePath = new Path( - TaskLog.getAttemptDir(attempt.toString()).toString(), + File attemptLogDir = TaskLog.getAttemptDir( + org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false); + Path jobACLsFilePath = new Path(attemptLogDir.toString(), TaskRunner.jobACLsFile); new File(jobACLsFilePath.toUri().getPath()).delete(); assertEquals("Incorrect return code for " + unauthorizedUser, @@ -354,7 +355,7 @@ public class TestWebUIAuthorization exte // delete the whole task log dir of attempt and verify that we get // correct response code (i.e. HTTP_GONE) when task logs are accessed. - FileUtil.fullyDelete(TaskLog.getAttemptDir(attempt.toString())); + FileUtil.fullyDelete(attemptLogDir); assertEquals("Incorrect return code for " + jobSubmitter, HttpURLConnection.HTTP_GONE, getHttpStatusCode(stdoutURL, jobSubmitter, "GET")); Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=943124&r1=943123&r2=943124&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Tue May 11 14:32:50 2010 @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.text.NumberFormat; import java.util.ArrayList; @@ -43,7 +44,10 @@ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.Utils; +import org.apache.hadoop.mapred.TaskLog.LogName; +import org.apache.hadoop.mapred.TaskLog.Reader; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; @@ -415,4 +419,44 @@ public class MapReduceTestUtil { return result.toString(); } + /** + * Reads tasklog and returns it as string after trimming it. + * + * @param filter + * Task log filter; can be STDOUT, STDERR, SYSLOG, DEBUGOUT, PROFILE + * @param taskId + * The task id for which the log has to collected + * @param isCleanup + * whether the task is a cleanup attempt or not. + * @return task log as string + * @throws IOException + */ + public static String readTaskLog(TaskLog.LogName filter, + org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup) + throws IOException { + // string buffer to store task log + StringBuffer result = new StringBuffer(); + int res; + + // reads the whole tasklog into inputstream + InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, + isCleanup); + // construct string log from inputstream. + byte[] b = new byte[65536]; + while (true) { + res = taskLogReader.read(b); + if (res > 0) { + result.append(new String(b)); + } else { + break; + } + } + taskLogReader.close(); + + // trim the string and return it + String str = result.toString(); + str = str.trim(); + return str; + } + }