Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: (qmail 12710 invoked from network); 25 Jan 2011 02:32:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 25 Jan 2011 02:32:05 -0000 Received: (qmail 79616 invoked by uid 500); 25 Jan 2011 02:32:05 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 79578 invoked by uid 500); 25 Jan 2011 02:32:04 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 79569 invoked by uid 99); 25 Jan 2011 02:32:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jan 2011 02:32:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,T_FRT_PROFILE2,T_FRT_PROFIT2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jan 2011 02:32:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3B40C2388906; Tue, 25 Jan 2011 02:31:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1063107 - in /incubator/hama/trunk: CHANGES.txt conf/log4j.properties src/java/org/apache/hama/bsp/TaskLog.java src/java/org/apache/hama/bsp/TaskLogAppender.java Date: Tue, 25 Jan 2011 02:31:42 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110125023142.3B40C2388906@eris.apache.org> Author: edwardyoon Date: Tue Jan 25 02:31:41 2011 New Revision: 1063107 URL: http://svn.apache.org/viewvc?rev=1063107&view=rev Log: Add task log appender and Fix log4j rootLogger Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/conf/log4j.properties Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1063107&r1=1063106&r2=1063107&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Tue Jan 25 02:31:41 2011 @@ -195,6 +195,7 @@ Trunk (unreleased changes) BUG FIXES + HAMA-350: Add task log appender and Fix log4j rootLogger (edwardyoon) HAMA-345: Add execution time calculator to Pi job (edwardyoon) HAMA-344: Task successfully finished but system re-attempt (edwardyoon) HAMA-343: Fix Maven test fails (Tommaso Teofili via edwardyoon) Modified: incubator/hama/trunk/conf/log4j.properties URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/log4j.properties?rev=1063107&r1=1063106&r2=1063107&view=diff ============================================================================== --- incubator/hama/trunk/conf/log4j.properties (original) +++ incubator/hama/trunk/conf/log4j.properties Tue Jan 25 02:31:41 2011 @@ -1,5 +1,5 @@ # Define some default values that can be overridden by system properties -hama.root.logger=INFO,console,DEBUG +hama.root.logger=INFO,console hama.log.dir=. hama.log.file=hama.log @@ -28,6 +28,23 @@ log4j.appender.DRFA.layout.ConversionPat # Debugging Pattern format #log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n +# +# TaskLog Appender +# + +#Default values +hama.tasklog.taskid=null +hama.tasklog.noKeepSplits=4 +hama.tasklog.totalLogFileSize=100 +hama.tasklog.purgeLogSplits=true +hama.tasklog.logsRetainHours=12 + +log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender +log4j.appender.TLA.taskId=${hama.tasklog.taskid} +log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize} + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n # # console Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java?rev=1063107&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java Tue Jan 25 02:31:41 2011 @@ -0,0 +1,323 @@ +package org.apache.hama.bsp; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hama.HamaConfiguration; + +public class TaskLog { + private static final Log LOG = LogFactory.getLog(TaskLog.class.getName()); + + private static final File LOG_DIR = new File( + System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile(); + + static { + if (!LOG_DIR.exists()) { + LOG_DIR.mkdirs(); + } + } + + public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) { + return new File(new File(LOG_DIR, taskid.toString()), filter.toString()); + } + + /** + * The filter for userlogs. + */ + public static enum LogName { + /** Log on the stdout of the task. */ + STDOUT("stdout"), + + /** Log on the stderr of the task. */ + STDERR("stderr"), + + /** Log on the map-reduce system logs of the task. */ + SYSLOG("syslog"), + + /** The java profiler information. */ + PROFILE("profile.out"), + + /** Log the debug script's stdout */ + DEBUGOUT("debugout"); + + private String prefix; + + private LogName(String prefix) { + this.prefix = prefix; + } + + @Override + public String toString() { + return prefix; + } + } + + private static class TaskLogsPurgeFilter implements FileFilter { + long purgeTimeStamp; + + TaskLogsPurgeFilter(long purgeTimeStamp) { + this.purgeTimeStamp = purgeTimeStamp; + } + + public boolean accept(File file) { + LOG.debug("PurgeFilter - file: " + file + ", mtime: " + + file.lastModified() + ", purge: " + purgeTimeStamp); + return file.lastModified() < purgeTimeStamp; + } + } + + /** + * Purge old user logs. + * + * @throws IOException + */ + public static synchronized void cleanup(int logsRetainHours) + throws IOException { + // Purge logs of tasks on this tasktracker if their + // mtime has exceeded "mapred.task.log.retain" hours + long purgeTimeStamp = System.currentTimeMillis() + - (logsRetainHours * 60L * 60 * 1000); + File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter( + purgeTimeStamp)); + if (oldTaskLogs != null) { + for (int i = 0; i < oldTaskLogs.length; ++i) { + FileUtil.fullyDelete(oldTaskLogs[i]); + } + } + } + + static class Reader extends InputStream { + private long bytesRemaining; + private FileInputStream file; + + /** + * Read a log file from start to end positions. The offsets may be negative, + * in which case they are relative to the end of the file. For example, + * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind, + * -4197, -1) is the last 4196 bytes. + * + * @param taskid the id of the task to read the log file for + * @param kind the kind of log to read + * @param start the offset to read from (negative is relative to tail) + * @param end the offset to read upto (negative is relative to tail) + * @throws IOException + */ + public Reader(TaskAttemptID taskid, LogName kind, long start, long end) + throws IOException { + // find the right log file + File filename = getTaskLogFile(taskid, kind); + // calculate the start and stop + long size = filename.length(); + if (start < 0) { + start += size + 1; + } + if (end < 0) { + end += size + 1; + } + start = Math.max(0, Math.min(start, size)); + end = Math.max(0, Math.min(end, size)); + bytesRemaining = end - start; + file = new FileInputStream(filename); + // skip upto start + long pos = 0; + while (pos < start) { + long result = file.skip(start - pos); + if (result < 0) { + bytesRemaining = 0; + break; + } + pos += result; + } + } + + @Override + public int read() throws IOException { + int result = -1; + if (bytesRemaining > 0) { + bytesRemaining -= 1; + result = file.read(); + } + return result; + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + length = (int) Math.min(length, bytesRemaining); + int bytes = file.read(buffer, offset, length); + if (bytes > 0) { + bytesRemaining -= bytes; + } + return bytes; + } + + @Override + public int available() throws IOException { + return (int) Math.min(bytesRemaining, file.available()); + } + + @Override + public void close() throws IOException { + file.close(); + } + } + + private static final String bashCommand = "bash"; + private static final String tailCommand = "tail"; + + /** + * Get the desired maximum length of task's logs. + * + * @param conf the job to look in + * @return the number of bytes to cap the log files at + */ + public static long getTaskLogLength(HamaConfiguration conf) { + return conf.getLong("mapred.userlog.limit.kb", 100) * 1024; + } + + /** + * Wrap a command in a shell to capture stdout and stderr to files. If the + * tailLength is 0, the entire output will be saved. + * + * @param cmd The command and the arguments that should be run + * @param stdoutFilename The filename that stdout should be saved to + * @param stderrFilename The filename that stderr should be saved to + * @param tailLength The length of the tail to be saved. + * @return the modified command that should be run + */ + public static List captureOutAndError(List cmd, + File stdoutFilename, File stderrFilename, long tailLength) + throws IOException { + return captureOutAndError(null, cmd, stdoutFilename, stderrFilename, + tailLength); + } + + /** + * Wrap a command in a shell to capture stdout and stderr to files. Setup + * commands such as setting memory limit can be passed which will be executed + * before exec. If the tailLength is 0, the entire output will be saved. + * + * @param setup The setup commands for the execed process. + * @param cmd The command and the arguments that should be run + * @param stdoutFilename The filename that stdout should be saved to + * @param stderrFilename The filename that stderr should be saved to + * @param tailLength The length of the tail to be saved. + * @return the modified command that should be run + */ + public static List captureOutAndError(List setup, + List cmd, File stdoutFilename, File stderrFilename, + long tailLength) throws IOException { + String stdout = FileUtil.makeShellPath(stdoutFilename); + String stderr = FileUtil.makeShellPath(stderrFilename); + List result = new ArrayList(3); + result.add(bashCommand); + result.add("-c"); + StringBuffer mergedCmd = new StringBuffer(); + if (setup != null && setup.size() > 0) { + mergedCmd.append(addCommand(setup, false)); + mergedCmd.append(";"); + } + if (tailLength > 0) { + mergedCmd.append("("); + } else { + mergedCmd.append("exec "); + } + mergedCmd.append(addCommand(cmd, true)); + mergedCmd.append(" < /dev/null "); + if (tailLength > 0) { + mergedCmd.append(" | "); + mergedCmd.append(tailCommand); + mergedCmd.append(" -c "); + mergedCmd.append(tailLength); + mergedCmd.append(" >> "); + mergedCmd.append(stdout); + mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | "); + mergedCmd.append(tailCommand); + mergedCmd.append(" -c "); + mergedCmd.append(tailLength); + mergedCmd.append(" >> "); + mergedCmd.append(stderr); + mergedCmd.append(" ; exit $PIPESTATUS"); + } else { + mergedCmd.append(" 1>> "); + mergedCmd.append(stdout); + mergedCmd.append(" 2>> "); + mergedCmd.append(stderr); + } + result.add(mergedCmd.toString()); + return result; + } + + /** + * Add quotes to each of the command strings and return as a single string + * + * @param cmd The command to be quoted + * @param isExecutable makes shell path if the first argument is executable + * @return returns The quoted string. + * @throws IOException + */ + public static String addCommand(List cmd, boolean isExecutable) + throws IOException { + StringBuffer command = new StringBuffer(); + for (String s : cmd) { + command.append('\''); + if (isExecutable) { + // the executable name needs to be expressed as a shell path for the + // shell to find it. + command.append(FileUtil.makeShellPath(new File(s))); + isExecutable = false; + } else { + command.append(s); + } + command.append('\''); + command.append(" "); + } + return command.toString(); + } + + /** + * Wrap a command in a shell to capture debug script's stdout and stderr to + * debugout. + * + * @param cmd The command and the arguments that should be run + * @param debugoutFilename The filename that stdout and stderr should be saved + * to. + * @return the modified command that should be run + * @throws IOException + */ + public static List captureDebugOut(List cmd, + File debugoutFilename) throws IOException { + String debugout = FileUtil.makeShellPath(debugoutFilename); + List result = new ArrayList(3); + result.add(bashCommand); + result.add("-c"); + StringBuffer mergedCmd = new StringBuffer(); + mergedCmd.append("exec "); + boolean isExecutable = true; + for (String s : cmd) { + if (isExecutable) { + // the executable name needs to be expressed as a shell path for the + // shell to find it. + mergedCmd.append(FileUtil.makeShellPath(new File(s))); + isExecutable = false; + } else { + mergedCmd.append(s); + } + mergedCmd.append(" "); + } + mergedCmd.append(" < /dev/null "); + mergedCmd.append(" >"); + mergedCmd.append(debugout); + mergedCmd.append(" 2>&1 "); + result.add(mergedCmd.toString()); + return result; + } + +} // TaskLog Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1063107&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java Tue Jan 25 02:31:41 2011 @@ -0,0 +1,75 @@ +package org.apache.hama.bsp; + +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.log4j.FileAppender; +import org.apache.log4j.spi.LoggingEvent; + +public class TaskLogAppender extends FileAppender { + private String taskId; // taskId should be managed as String rather than + // TaskID object + // so that log4j can configure it from the configuration(log4j.properties). + private int maxEvents; + private Queue tail = null; + + @Override + public void activateOptions() { + synchronized (this) { + if (maxEvents > 0) { + tail = new LinkedList(); + } + setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId), + TaskLog.LogName.SYSLOG).toString()); + setAppend(true); + super.activateOptions(); + } + } + + @Override + public void append(LoggingEvent event) { + synchronized (this) { + if (tail == null) { + super.append(event); + } else { + if (tail.size() >= maxEvents) { + tail.remove(); + } + tail.add(event); + } + } + } + + @Override + public synchronized void close() { + if (tail != null) { + for (LoggingEvent event : tail) { + super.append(event); + } + } + super.close(); + } + + /** + * Getter/Setter methods for log4j. + */ + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + private static final int EVENT_SIZE = 100; + + public long getTotalLogFileSize() { + return maxEvents * EVENT_SIZE; + } + + public void setTotalLogFileSize(long logSize) { + maxEvents = (int) logSize / EVENT_SIZE; + } + +}