Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 15958 invoked from network); 8 Jul 2009 20:33:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Jul 2009 20:33:32 -0000 Received: (qmail 9991 invoked by uid 500); 8 Jul 2009 20:33:42 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 9965 invoked by uid 500); 8 Jul 2009 20:33:41 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 9954 invoked by uid 99); 8 Jul 2009 20:33:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jul 2009 20:33:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jul 2009 20:33:27 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3752E23888D9; Wed, 8 Jul 2009 20:33:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r792299 - in /hadoop/hdfs/trunk: ./ src/test/hdfs-with-mr/org/apache/hadoop/fs/ src/test/hdfs-with-mr/org/apache/hadoop/test/ Date: Wed, 08 Jul 2009 20:33:04 -0000 To: hdfs-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090708203305.3752E23888D9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Wed Jul 8 20:33:04 2009 New Revision: 792299 URL: http://svn.apache.org/viewvc?rev=792299&view=rev Log: HDFS-459. Introduce Job History Log Analyzer. Contributed by Konstantin Shvachko. Added: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java (with props) hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java (with props) Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Wed Jul 8 20:33:04 2009 @@ -9,6 +9,8 @@ HDFS-447. Add LDAP lookup to hdfsproxy. (Zhiyong Zhang via cdouglas) + HDFS-459. Introduce Job History Log Analyzer. (shv) + IMPROVEMENTS HDFS-381. Remove blocks from DataNode maps when corresponding file Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java Wed Jul 8 20:33:04 2009 @@ -23,10 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; /** * Reducer that accumulates values based on their type. @@ -44,6 +41,7 @@ * * */ +@SuppressWarnings("deprecation") public class AccumulatingReducer extends MapReduceBase implements Reducer { static final String VALUE_TYPE_LONG = "l:"; @@ -74,10 +72,10 @@ // concatenate strings if (field.startsWith(VALUE_TYPE_STRING)) { - String sSum = ""; + StringBuffer sSum = new StringBuffer(); while (values.hasNext()) - sSum += values.next().toString() + ";"; - output.collect(key, new Text(sSum)); + sSum.append(values.next().toString()).append(";"); + output.collect(key, new Text(sSum.toString())); reporter.setStatus("finished " + field + " ::host = " + hostName); return; } Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java Wed Jul 8 20:33:04 2009 @@ -162,16 +162,15 @@ *
  • i/o rate squared
  • * */ - private abstract static class IOStatMapper extends IOMapperBase { + private abstract static class IOStatMapper extends IOMapperBase { IOStatMapper() { - super(fsConfig); } void collectStats(OutputCollector output, String name, long execTime, - Object objSize) throws IOException { - long totalSize = ((Long)objSize).longValue(); + Long objSize) throws IOException { + long totalSize = objSize.longValue(); float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); LOG.info("Number of bytes processed = " + totalSize); LOG.info("Exec time = " + execTime); @@ -201,7 +200,7 @@ buffer[i] = (byte)('0' + i % 50); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, long totalSize ) throws IOException { @@ -302,7 +301,7 @@ super(); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, long totalSize ) throws IOException { Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java Wed Jul 8 20:33:04 2009 @@ -142,10 +142,9 @@ /** * DistributedFSCheck mapper class. */ - public static class DistributedFSCheckMapper extends IOMapperBase { + public static class DistributedFSCheckMapper extends IOMapperBase { public DistributedFSCheckMapper() { - super(fsConfig); } public Object doIO(Reporter reporter, Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java Wed Jul 8 20:33:04 2009 @@ -19,14 +19,10 @@ import java.io.IOException; import java.net.InetAddress; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; /** * Base mapper class for IO operations. @@ -37,7 +33,8 @@ * statistics data to be collected by subsequent reducers. * */ -public abstract class IOMapperBase extends Configured +@SuppressWarnings("deprecation") +public abstract class IOMapperBase extends Configured implements Mapper { protected byte[] buffer; @@ -45,8 +42,11 @@ protected FileSystem fs; protected String hostName; - public IOMapperBase(Configuration conf) { - super(conf); + public IOMapperBase() { + } + + public void configure(JobConf conf) { + setConf(conf); try { fs = FileSystem.get(conf); } catch (Exception e) { @@ -61,10 +61,6 @@ } } - public void configure(JobConf job) { - setConf(job); - } - public void close() throws IOException { } @@ -78,7 +74,7 @@ * {@link #collectStats(OutputCollector,String,long,Object)} * @throws IOException */ - abstract Object doIO(Reporter reporter, + abstract T doIO(Reporter reporter, String name, long value) throws IOException; @@ -94,7 +90,7 @@ abstract void collectStats(OutputCollector output, String name, long execTime, - Object doIOReturnValue) throws IOException; + T doIOReturnValue) throws IOException; /** * Map file name and offset into statistical data. @@ -119,7 +115,7 @@ reporter.setStatus("starting " + name + " ::host = " + hostName); long tStart = System.currentTimeMillis(); - Object statValue = doIO(reporter, name, longValue); + T statValue = doIO(reporter, name, longValue); long tEnd = System.currentTimeMillis(); long execTime = tEnd - tStart; collectStats(output, name, execTime, statValue); Added: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java?rev=792299&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java Wed Jul 8 20:33:04 2009 @@ -0,0 +1,1129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +/** + * Job History Log Analyzer. + * + *

    Description.

    + * This a tool for parsing and analyzing history logs of map-reduce jobs. + * History logs contain information about execution of jobs, tasks, and + * attempts. This tool focuses on submission, launch, start, and finish times, + * as well as the success or failure of jobs, tasks, and attempts. + *

    + * The analyzer calculates per hour slot utilization for the cluster + * as follows. + * For each task attempt it divides the time segment from the start of the + * attempt tS to the finish tF into whole hours + * [t0, ..., tn], where t0 <= tS + * is the maximal whole hour preceding tS, and + * tn >= tF is the minimal whole hour after tF. + * Thus, [t0, ..., tn] covers the segment + * [tS, tF], during which the attempt was executed. + * Each interval [ti, ti+1] fully contained in + * [tS, tF] corresponds to exactly one slot on + * a map-reduce cluster (usually MAP-slot or REDUCE-slot). + * If interval [ti, ti+1] only intersects with + * [tS, tF] then we say that the task + * attempt used just a fraction of the slot during this hour. + * The fraction equals the size of the intersection. + * Let slotTime(A, h) denote the number of slots calculated that way for a + * specific attempt A during hour h. + * The tool then sums all slots for all attempts for every hour. + * The result is the slot hour utilization of the cluster: + * slotTime(h) = SUMA slotTime(A,h). + *

    + * Log analyzer calculates slot hours for MAP and REDUCE + * attempts separately. + *

    + * Log analyzer distinguishes between successful and failed + * attempts. Task attempt is considered successful if its own status is SUCCESS + * and the statuses of the task and the job it is a part of are also SUCCESS. + * Otherwise the task attempt is considered failed. + *

    + * Map-reduce clusters are usually configured to have a fixed number of MAP + * and REDUCE slots per node. Thus the maximal possible number of slots on + * the cluster is total_slots = total_nodes * slots_per_node. + * Effective slot hour cannot exceed total_slots for successful + * attempts. + *

    + * Pending time characterizes the wait time of attempts. + * It is calculated similarly to the slot hour except that the wait interval + * starts when the job is submitted and ends when an attempt starts execution. + * In addition to that pending time also includes intervals between attempts + * of the same task if it was re-executed. + *

    + * History log analyzer calculates two pending time variations. First is based + * on job submission time as described above, second, starts the wait interval + * when the job is launched rather than submitted. + * + *

    Input.

    + * The following input parameters can be specified in the argument string + * to the job log analyzer: + *
      + *
    • -historyDir inputDir specifies the location of the directory + * where analyzer will be looking for job history log files.
    • + *
    • -resFile resultFile the name of the result file.
    • + *
    • -usersIncluded | -usersExcluded userList slot utilization and + * pending time can be calculated for all or for all but the specified users. + *
      + * userList is a comma or semicolon separated list of users.
    • + *
    • -gzip is used if history log files are compressed. + * Only {@link GzipCodec} is currently supported.
    • + *
    • -jobDelimiter pattern one can concatenate original log files into + * larger file(s) with the specified delimiter to recognize the end of the log + * for one job from the next one.
      + * pattern is a java regular expression + * {@link java.util.regex.Pattern}, which should match only the log delimiters. + *
      + * E.g. pattern ".!!FILE=.*!!" matches delimiters, which contain + * the original history log file names in the following form:
      + * "$!!FILE=my.job.tracker.com_myJobId_user_wordcount.log!!"
    • + *
    • -clean cleans up default directories used by the analyzer.
    • + *
    • -test test one file locally and exit; + * does not require map-reduce.
    • + *
    • -help print usage.
    • + *
    + * + *

    Output.

    + * The output file is formatted as a tab separated table consisting of four + * columns: SERIES, PERIOD, TYPE, SLOT_HOUR. + *
      + *
    • SERIES one of the four statistical series;
    • + *
    • PERIOD the start of the time interval in the following format: + * "yyyy-mm-dd hh:mm:ss";
    • + *
    • TYPE the slot type, e.g. MAP or REDUCE;
    • + *
    • SLOT_HOUR the value of the slot usage during this + * time interval.
    • + *
    + */ +@SuppressWarnings("deprecation") +public class JHLogAnalyzer { + private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class); + // Constants + private static final String JHLA_ROOT_DIR = + System.getProperty("test.build.data", "stats/JHLA"); + private static final Path INPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_input"); + private static final String BASE_INPUT_FILE_NAME = "jhla_in_"; + private static final Path OUTPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_output"); + private static final Path RESULT_FILE = + new Path(JHLA_ROOT_DIR, "jhla_result.txt"); + private static final Path DEFAULT_HISTORY_DIR = new Path("history"); + + private static final int DEFAULT_TIME_INTERVAL_MSEC = 1000*60*60; // 1 hour + + static{ + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + } + + static enum StatSeries { + STAT_ALL_SLOT_TIME + (AccumulatingReducer.VALUE_TYPE_LONG + "allSlotTime"), + STAT_FAILED_SLOT_TIME + (AccumulatingReducer.VALUE_TYPE_LONG + "failedSlotTime"), + STAT_SUBMIT_PENDING_SLOT_TIME + (AccumulatingReducer.VALUE_TYPE_LONG + "submitPendingSlotTime"), + STAT_LAUNCHED_PENDING_SLOT_TIME + (AccumulatingReducer.VALUE_TYPE_LONG + "launchedPendingSlotTime"); + + private String statName = null; + private StatSeries(String name) {this.statName = name;} + public String toString() {return statName;} + } + + private static class FileCreateDaemon extends Thread { + private static final int NUM_CREATE_THREADS = 10; + private static volatile int numFinishedThreads; + private static volatile int numRunningThreads; + private static FileStatus[] jhLogFiles; + + FileSystem fs; + int start; + int end; + + FileCreateDaemon(FileSystem fs, int start, int end) { + this.fs = fs; + this.start = start; + this.end = end; + } + + public void run() { + try { + for(int i=start; i < end; i++) { + String name = getFileName(i); + Path controlFile = new Path(INPUT_DIR, "in_file_" + name); + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile, + Text.class, LongWritable.class, + CompressionType.NONE); + String logFile = jhLogFiles[i].getPath().toString(); + writer.append(new Text(logFile), new LongWritable(0)); + } catch(Exception e) { + throw new IOException(e); + } finally { + if (writer != null) + writer.close(); + writer = null; + } + } + } catch(IOException ex) { + LOG.error("FileCreateDaemon failed.", ex); + } + numFinishedThreads++; + } + + private static void createControlFile(FileSystem fs, Path jhLogDir + ) throws IOException { + fs.delete(INPUT_DIR, true); + jhLogFiles = fs.listStatus(jhLogDir); + + numFinishedThreads = 0; + try { + int start = 0; + int step = jhLogFiles.length / NUM_CREATE_THREADS + + ((jhLogFiles.length % NUM_CREATE_THREADS) > 0 ? 1 : 0); + FileCreateDaemon[] daemons = new FileCreateDaemon[NUM_CREATE_THREADS]; + numRunningThreads = 0; + for(int tIdx=0; tIdx < NUM_CREATE_THREADS && start < jhLogFiles.length; tIdx++) { + int end = Math.min(start + step, jhLogFiles.length); + daemons[tIdx] = new FileCreateDaemon(fs, start, end); + start += step; + numRunningThreads++; + } + for(int tIdx=0; tIdx < numRunningThreads; tIdx++) { + daemons[tIdx].start(); + } + } finally { + int prevValue = 0; + while(numFinishedThreads < numRunningThreads) { + if(prevValue < numFinishedThreads) { + LOG.info("Finished " + numFinishedThreads + " threads out of " + numRunningThreads); + prevValue = numFinishedThreads; + } + try {Thread.sleep(500);} catch (InterruptedException e) {} + } + } + } + } + + private static void createControlFile(FileSystem fs, Path jhLogDir + ) throws IOException { + LOG.info("creating control file: JH log dir = " + jhLogDir); + FileCreateDaemon.createControlFile(fs, jhLogDir); + LOG.info("created control file: JH log dir = " + jhLogDir); + } + + private static String getFileName(int fIdx) { + return BASE_INPUT_FILE_NAME + Integer.toString(fIdx); + } + + /** + * If keyVal is of the form KEY="VALUE", then this will return [KEY, VALUE] + */ + private static String [] getKeyValue(String t) throws IOException { + String[] keyVal = t.split("=\"*|\""); + return keyVal; + } + + /** + * JobHistory log record. + */ + private static class JobHistoryLog { + String JOBID; + String JOB_STATUS; + long SUBMIT_TIME; + long LAUNCH_TIME; + long FINISH_TIME; + long TOTAL_MAPS; + long TOTAL_REDUCES; + long FINISHED_MAPS; + long FINISHED_REDUCES; + String USER; + Map tasks; + + boolean isSuccessful() { + return (JOB_STATUS != null) && JOB_STATUS.equals("SUCCESS"); + } + + void parseLine(String line) throws IOException { + StringTokenizer tokens = new StringTokenizer(line); + if(!tokens.hasMoreTokens()) + return; + String what = tokens.nextToken(); + // Line should start with one of the following: + // Job, Task, MapAttempt, ReduceAttempt + if(what.equals("Job")) + updateJob(tokens); + else if(what.equals("Task")) + updateTask(tokens); + else if(what.indexOf("Attempt") >= 0) + updateTaskAttempt(tokens); + } + + private void updateJob(StringTokenizer tokens) throws IOException { + while(tokens.hasMoreTokens()) { + String t = tokens.nextToken(); + String[] keyVal = getKeyValue(t); + if(keyVal.length < 2) continue; + + if(keyVal[0].equals("JOBID")) { + if(JOBID == null) + JOBID = new String(keyVal[1]); + else if(!JOBID.equals(keyVal[1])) { + LOG.error("Incorrect JOBID: " + + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) + + " expect " + JOBID); + return; + } + } + else if(keyVal[0].equals("JOB_STATUS")) + JOB_STATUS = new String(keyVal[1]); + else if(keyVal[0].equals("SUBMIT_TIME")) + SUBMIT_TIME = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("LAUNCH_TIME")) + LAUNCH_TIME = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("FINISH_TIME")) + FINISH_TIME = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("TOTAL_MAPS")) + TOTAL_MAPS = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("TOTAL_REDUCES")) + TOTAL_REDUCES = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("FINISHED_MAPS")) + FINISHED_MAPS = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("FINISHED_REDUCES")) + FINISHED_REDUCES = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("USER")) + USER = new String(keyVal[1]); + } + } + + private void updateTask(StringTokenizer tokens) throws IOException { + // unpack + TaskHistoryLog task = new TaskHistoryLog().parse(tokens); + if(task.TASKID == null) { + LOG.error("TASKID = NULL for job " + JOBID); + return; + } + // update or insert + if(tasks == null) + tasks = new HashMap((int)(TOTAL_MAPS + TOTAL_REDUCES)); + TaskHistoryLog existing = tasks.get(task.TASKID); + if(existing == null) + tasks.put(task.TASKID, task); + else + existing.updateWith(task); + } + + private void updateTaskAttempt(StringTokenizer tokens) throws IOException { + // unpack + TaskAttemptHistoryLog attempt = new TaskAttemptHistoryLog(); + String taskID = attempt.parse(tokens); + if(taskID == null) return; + if(tasks == null) + tasks = new HashMap((int)(TOTAL_MAPS + TOTAL_REDUCES)); + TaskHistoryLog existing = tasks.get(taskID); + if(existing == null) { + existing = new TaskHistoryLog(taskID); + tasks.put(taskID, existing); + } + existing.updateWith(attempt); + } + } + + /** + * TaskHistory log record. + */ + private static class TaskHistoryLog { + String TASKID; + String TASK_TYPE; // MAP, REDUCE, SETUP, CLEANUP + String TASK_STATUS; + long START_TIME; + long FINISH_TIME; + Map attempts; + + TaskHistoryLog() {} + + TaskHistoryLog(String taskID) { + TASKID = taskID; + } + + boolean isSuccessful() { + return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS"); + } + + TaskHistoryLog parse(StringTokenizer tokens) throws IOException { + while(tokens.hasMoreTokens()) { + String t = tokens.nextToken(); + String[] keyVal = getKeyValue(t); + if(keyVal.length < 2) continue; + + if(keyVal[0].equals("TASKID")) { + if(TASKID == null) + TASKID = new String(keyVal[1]); + else if(!TASKID.equals(keyVal[1])) { + LOG.error("Incorrect TASKID: " + + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) + + " expect " + TASKID); + continue; + } + } + else if(keyVal[0].equals("TASK_TYPE")) + TASK_TYPE = new String(keyVal[1]); + else if(keyVal[0].equals("TASK_STATUS")) + TASK_STATUS = new String(keyVal[1]); + else if(keyVal[0].equals("START_TIME")) + START_TIME = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("FINISH_TIME")) + FINISH_TIME = Long.parseLong(keyVal[1]); + } + return this; + } + + /** + * Update with non-null fields of the same task log record. + */ + void updateWith(TaskHistoryLog from) throws IOException { + if(TASKID == null) + TASKID = from.TASKID; + else if(!TASKID.equals(from.TASKID)) { + throw new IOException("Incorrect TASKID: " + from.TASKID + + " expect " + TASKID); + } + if(TASK_TYPE == null) + TASK_TYPE = from.TASK_TYPE; + else if(! TASK_TYPE.equals(from.TASK_TYPE)) { + LOG.error( + "Incorrect TASK_TYPE: " + from.TASK_TYPE + " expect " + TASK_TYPE + + " for task " + TASKID); + return; + } + if(from.TASK_STATUS != null) + TASK_STATUS = from.TASK_STATUS; + if(from.START_TIME > 0) + START_TIME = from.START_TIME; + if(from.FINISH_TIME > 0) + FINISH_TIME = from.FINISH_TIME; + } + + /** + * Update with non-null fields of the task attempt log record. + */ + void updateWith(TaskAttemptHistoryLog attempt) throws IOException { + if(attempt.TASK_ATTEMPT_ID == null) { + LOG.error("Unexpected TASK_ATTEMPT_ID = null for task " + TASKID); + return; + } + if(attempts == null) + attempts = new HashMap(); + TaskAttemptHistoryLog existing = attempts.get(attempt.TASK_ATTEMPT_ID); + if(existing == null) + attempts.put(attempt.TASK_ATTEMPT_ID, attempt); + else + existing.updateWith(attempt); + // update task start time + if(attempt.START_TIME > 0 && + (this.START_TIME == 0 || this.START_TIME > attempt.START_TIME)) + START_TIME = attempt.START_TIME; + } + } + + /** + * TaskAttemptHistory log record. + */ + private static class TaskAttemptHistoryLog { + String TASK_ATTEMPT_ID; + String TASK_STATUS; // this task attempt status + long START_TIME; + long FINISH_TIME; + long HDFS_BYTES_READ; + long HDFS_BYTES_WRITTEN; + long FILE_BYTES_READ; + long FILE_BYTES_WRITTEN; + + /** + * Task attempt is considered successful iff all three statuses + * of the attempt, the task, and the job equal "SUCCESS". + */ + boolean isSuccessful() { + return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS"); + } + + String parse(StringTokenizer tokens) throws IOException { + String taskID = null; + while(tokens.hasMoreTokens()) { + String t = tokens.nextToken(); + String[] keyVal = getKeyValue(t); + if(keyVal.length < 2) continue; + + if(keyVal[0].equals("TASKID")) { + if(taskID == null) + taskID = new String(keyVal[1]); + else if(!taskID.equals(keyVal[1])) { + LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID); + continue; + } + } + else if(keyVal[0].equals("TASK_ATTEMPT_ID")) { + if(TASK_ATTEMPT_ID == null) + TASK_ATTEMPT_ID = new String(keyVal[1]); + else if(!TASK_ATTEMPT_ID.equals(keyVal[1])) { + LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID); + continue; + } + } + else if(keyVal[0].equals("TASK_STATUS")) + TASK_STATUS = new String(keyVal[1]); + else if(keyVal[0].equals("START_TIME")) + START_TIME = Long.parseLong(keyVal[1]); + else if(keyVal[0].equals("FINISH_TIME")) + FINISH_TIME = Long.parseLong(keyVal[1]); + } + return taskID; + } + + /** + * Update with non-null fields of the same task attempt log record. + */ + void updateWith(TaskAttemptHistoryLog from) throws IOException { + if(TASK_ATTEMPT_ID == null) + TASK_ATTEMPT_ID = from.TASK_ATTEMPT_ID; + else if(! TASK_ATTEMPT_ID.equals(from.TASK_ATTEMPT_ID)) { + throw new IOException( + "Incorrect TASK_ATTEMPT_ID: " + from.TASK_ATTEMPT_ID + + " expect " + TASK_ATTEMPT_ID); + } + if(from.TASK_STATUS != null) + TASK_STATUS = from.TASK_STATUS; + if(from.START_TIME > 0) + START_TIME = from.START_TIME; + if(from.FINISH_TIME > 0) + FINISH_TIME = from.FINISH_TIME; + if(from.HDFS_BYTES_READ > 0) + HDFS_BYTES_READ = from.HDFS_BYTES_READ; + if(from.HDFS_BYTES_WRITTEN > 0) + HDFS_BYTES_WRITTEN = from.HDFS_BYTES_WRITTEN; + if(from.FILE_BYTES_READ > 0) + FILE_BYTES_READ = from.FILE_BYTES_READ; + if(from.FILE_BYTES_WRITTEN > 0) + FILE_BYTES_WRITTEN = from.FILE_BYTES_WRITTEN; + } + } + + /** + * Key = statName*date-time*taskType + * Value = number of msec for the our + */ + private static class IntervalKey { + static final String KEY_FIELD_DELIMITER = "*"; + String statName; + String dateTime; + String taskType; + + IntervalKey(String stat, long timeMSec, String taskType) { + statName = stat; + SimpleDateFormat dateF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTime = dateF.format(new Date(timeMSec)); + this.taskType = taskType; + } + + IntervalKey(String key) { + StringTokenizer keyTokens = new StringTokenizer(key, KEY_FIELD_DELIMITER); + if(!keyTokens.hasMoreTokens()) return; + statName = keyTokens.nextToken(); + if(!keyTokens.hasMoreTokens()) return; + dateTime = keyTokens.nextToken(); + if(!keyTokens.hasMoreTokens()) return; + taskType = keyTokens.nextToken(); + } + + void setStatName(String stat) { + statName = stat; + } + + String getStringKey() { + return statName + KEY_FIELD_DELIMITER + + dateTime + KEY_FIELD_DELIMITER + + taskType; + } + + Text getTextKey() { + return new Text(getStringKey()); + } + + public String toString() { + return getStringKey(); + } + } + + /** + * Mapper class. + */ + private static class JHLAMapper extends IOMapperBase { + /** + * A line pattern, which delimits history logs of different jobs, + * if multiple job logs are written in the same file. + * Null value means only one job log per file is expected. + * The pattern should be a regular expression as in + * {@link String#matches(String)}. + */ + String jobDelimiterPattern; + int maxJobDelimiterLineLength; + /** Count only these users jobs */ + Collection usersIncluded; + /** Exclude jobs of the following users */ + Collection usersExcluded; + /** Type of compression for compressed files: gzip */ + Class compressionClass; + + JHLAMapper() throws IOException { + } + + JHLAMapper(Configuration conf) throws IOException { + configure(new JobConf(conf)); + } + + public void configure(JobConf conf) { + super.configure(conf ); + usersIncluded = getUserList(conf.get("jhla.users.included", null)); + usersExcluded = getUserList(conf.get("jhla.users.excluded", null)); + String zipClassName = conf.get("jhla.compression.class", null); + try { + compressionClass = (zipClassName == null) ? null : + Class.forName(zipClassName).asSubclass(CompressionCodec.class); + } catch(Exception e) { + throw new RuntimeException("Compression codec not found: ", e); + } + jobDelimiterPattern = conf.get("jhla.job.delimiter.pattern", null); + maxJobDelimiterLineLength = conf.getInt("jhla.job.delimiter.length", 512); + } + + @Override + public void map(Text key, + LongWritable value, + OutputCollector output, + Reporter reporter) throws IOException { + String name = key.toString(); + long longValue = value.get(); + + reporter.setStatus("starting " + name + " ::host = " + hostName); + + long tStart = System.currentTimeMillis(); + parseLogFile(fs, new Path(name), longValue, output, reporter); + long tEnd = System.currentTimeMillis(); + long execTime = tEnd - tStart; + + reporter.setStatus("finished " + name + " ::host = " + hostName + + " in " + execTime/1000 + " sec."); + } + + public Object doIO(Reporter reporter, + String path, // full path of history log file + long offset // starting offset within the file + ) throws IOException { + return null; + } + + void collectStats(OutputCollector output, + String name, + long execTime, + Object jobObjects) throws IOException { + } + + private boolean isEndOfJobLog(String line) { + if(jobDelimiterPattern == null) + return false; + return line.matches(jobDelimiterPattern); + } + + /** + * Collect information about one job. + * + * @param fs - file system + * @param filePath - full path of a history log file + * @param offset - starting offset in the history log file + * @throws IOException + */ + public void parseLogFile(FileSystem fs, + Path filePath, + long offset, + OutputCollector output, + Reporter reporter + ) throws IOException { + InputStream in = null; + try { + // open file & seek + FSDataInputStream stm = fs.open(filePath); + stm.seek(offset); + in = stm; + LOG.info("Opened " + filePath); + reporter.setStatus("Opened " + filePath); + // get a compression filter if specified + if(compressionClass != null) { + CompressionCodec codec = (CompressionCodec) + ReflectionUtils.newInstance(compressionClass, new Configuration()); + in = codec.createInputStream(stm); + LOG.info("Codec created " + filePath); + reporter.setStatus("Codec created " + filePath); + } + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + LOG.info("Reader created " + filePath); + // skip to the next job log start + long processed = 0L; + if(jobDelimiterPattern != null) { + for(String line = reader.readLine(); + line != null; line = reader.readLine()) { + if((stm.getPos() - processed) > 100000) { + processed = stm.getPos(); + reporter.setStatus("Processing " + filePath + " at " + processed); + } + if(isEndOfJobLog(line)) + break; + } + } + // parse lines and update job history + JobHistoryLog jh = new JobHistoryLog(); + int jobLineCount = 0; + for(String line = readLine(reader); + line != null; line = readLine(reader)) { + jobLineCount++; + if((stm.getPos() - processed) > 20000) { + processed = stm.getPos(); + long numTasks = (jh.tasks == null ? 0 : jh.tasks.size()); + String txt = "Processing " + filePath + " at " + processed + + " # tasks = " + numTasks; + reporter.setStatus(txt); + LOG.info(txt); + } + if(isEndOfJobLog(line)) { + if(jh.JOBID != null) { + LOG.info("Finished parsing job: " + jh.JOBID + + " line count = " + jobLineCount); + collectJobStats(jh, output, reporter); + LOG.info("Collected stats for job: " + jh.JOBID); + } + jh = new JobHistoryLog(); + jobLineCount = 0; + } else + jh.parseLine(line); + } + if(jh.JOBID == null) { + LOG.error("JOBID = NULL in " + filePath + " at " + processed); + return; + } + collectJobStats(jh, output, reporter); + } catch(Exception ie) { + // parsing errors can happen if the file has been truncated + LOG.error("JHLAMapper.parseLogFile", ie); + reporter.setStatus("JHLAMapper.parseLogFile failed " + + StringUtils.stringifyException(ie)); + throw new IOException("Job failed.", ie); + } finally { + if(in != null) in.close(); + } + } + + /** + * Read lines until one ends with a " ." or "\" " + */ + private StringBuffer resBuffer = new StringBuffer(); + private String readLine(BufferedReader reader) throws IOException { + resBuffer.setLength(0); + reader.mark(maxJobDelimiterLineLength); + for(String line = reader.readLine(); + line != null; line = reader.readLine()) { + if(isEndOfJobLog(line)) { + if(resBuffer.length() == 0) + resBuffer.append(line); + else + reader.reset(); + break; + } + if(resBuffer.length() == 0) + resBuffer.append(line); + else if(resBuffer.length() < 32000) + resBuffer.append(line); + if(line.endsWith(" .") || line.endsWith("\" ")) { + break; + } + reader.mark(maxJobDelimiterLineLength); + } + String result = resBuffer.length() == 0 ? null : resBuffer.toString(); + resBuffer.setLength(0); + return result; + } + + private void collectPerIntervalStats(OutputCollector output, + long start, long finish, String taskType, + StatSeries ... stats) throws IOException { + long curInterval = (start / DEFAULT_TIME_INTERVAL_MSEC) + * DEFAULT_TIME_INTERVAL_MSEC; + long curTime = start; + long accumTime = 0; + while(curTime < finish) { + // how much of the task time belonged to current interval + long nextInterval = curInterval + DEFAULT_TIME_INTERVAL_MSEC; + long intervalTime = ((finish < nextInterval) ? + finish : nextInterval) - curTime; + IntervalKey key = new IntervalKey("", curInterval, taskType); + Text val = new Text(String.valueOf(intervalTime)); + for(StatSeries statName : stats) { + key.setStatName(statName.toString()); + output.collect(key.getTextKey(), val); + } + + curTime = curInterval = nextInterval; + accumTime += intervalTime; + } + // For the pending stat speculative attempts may intersect. + // Only one of them is considered pending. + assert accumTime == finish - start || finish < start; + } + + private void collectJobStats(JobHistoryLog jh, + OutputCollector output, + Reporter reporter + ) throws IOException { + if(jh == null) + return; + if(jh.tasks == null) + return; + if(jh.SUBMIT_TIME <= 0) + throw new IOException("Job " + jh.JOBID + + " SUBMIT_TIME = " + jh.SUBMIT_TIME); + if(usersIncluded != null && !usersIncluded.contains(jh.USER)) + return; + if(usersExcluded != null && usersExcluded.contains(jh.USER)) + return; + + int numAttempts = 0; + long totalTime = 0; + boolean jobSuccess = jh.isSuccessful(); + long jobWaitTime = jh.LAUNCH_TIME - jh.SUBMIT_TIME; + // attemptSubmitTime is the job's SUBMIT_TIME, + // or the previous attempt FINISH_TIME for all subsequent attempts + for(TaskHistoryLog th : jh.tasks.values()) { + if(th.attempts == null) + continue; + // Task is successful iff both the task and the job are a "SUCCESS" + long attemptSubmitTime = jh.LAUNCH_TIME; + boolean taskSuccess = jobSuccess && th.isSuccessful(); + for(TaskAttemptHistoryLog tah : th.attempts.values()) { + // Task attempt is considered successful iff all three statuses + // of the attempt, the task, and the job equal "SUCCESS" + boolean success = taskSuccess && tah.isSuccessful(); + if(tah.START_TIME == 0) { + LOG.error("Start time 0 for task attempt " + tah.TASK_ATTEMPT_ID); + continue; + } + if(tah.FINISH_TIME < tah.START_TIME) { + LOG.error("Finish time " + tah.FINISH_TIME + " is less than " + + "Start time " + tah.START_TIME + " for task attempt " + + tah.TASK_ATTEMPT_ID); + tah.FINISH_TIME = tah.START_TIME; + } + + if(!"MAP".equals(th.TASK_TYPE) && !"REDUCE".equals(th.TASK_TYPE) && + !"CLEANUP".equals(th.TASK_TYPE) && !"SETUP".equals(th.TASK_TYPE)) { + LOG.error("Unexpected TASK_TYPE = " + th.TASK_TYPE + + " for attempt " + tah.TASK_ATTEMPT_ID); + } + + collectPerIntervalStats(output, + attemptSubmitTime, tah.START_TIME, th.TASK_TYPE, + StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME); + collectPerIntervalStats(output, + attemptSubmitTime - jobWaitTime, tah.START_TIME, th.TASK_TYPE, + StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME); + if(success) + collectPerIntervalStats(output, + tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE, + StatSeries.STAT_ALL_SLOT_TIME); + else + collectPerIntervalStats(output, + tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE, + StatSeries.STAT_ALL_SLOT_TIME, + StatSeries.STAT_FAILED_SLOT_TIME); + totalTime += (tah.FINISH_TIME - tah.START_TIME); + numAttempts++; + if(numAttempts % 500 == 0) { + reporter.setStatus("Processing " + jh.JOBID + " at " + numAttempts); + } + attemptSubmitTime = tah.FINISH_TIME; + } + } + LOG.info("Total Maps = " + jh.TOTAL_MAPS + + " Reduces = " + jh.TOTAL_REDUCES); + LOG.info("Finished Maps = " + jh.FINISHED_MAPS + + " Reduces = " + jh.FINISHED_REDUCES); + LOG.info("numAttempts = " + numAttempts); + LOG.info("totalTime = " + totalTime); + LOG.info("averageAttemptTime = " + + (numAttempts==0 ? 0 : totalTime/numAttempts)); + LOG.info("jobTotalTime = " + (jh.FINISH_TIME <= jh.SUBMIT_TIME? 0 : + jh.FINISH_TIME - jh.SUBMIT_TIME)); + } + } + + public static class JHLAPartitioner implements Partitioner { + static final int NUM_REDUCERS = 9; + + public void configure(JobConf conf) {} + + public int getPartition(Text key, Text value, int numPartitions) { + IntervalKey intKey = new IntervalKey(key.toString()); + if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) { + if(intKey.taskType.equals("MAP")) + return 0; + else if(intKey.taskType.equals("REDUCE")) + return 1; + } else if(intKey.statName.equals( + StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) { + if(intKey.taskType.equals("MAP")) + return 2; + else if(intKey.taskType.equals("REDUCE")) + return 3; + } else if(intKey.statName.equals( + StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) { + if(intKey.taskType.equals("MAP")) + return 4; + else if(intKey.taskType.equals("REDUCE")) + return 5; + } else if(intKey.statName.equals( + StatSeries.STAT_FAILED_SLOT_TIME.toString())) { + if(intKey.taskType.equals("MAP")) + return 6; + else if(intKey.taskType.equals("REDUCE")) + return 7; + } + return 8; + } + } + + private static void runJHLA( + Class> mapperClass, + Path outputDir, + Configuration fsConfig) throws IOException { + JobConf job = new JobConf(fsConfig, JHLogAnalyzer.class); + + job.setPartitionerClass(JHLAPartitioner.class); + + FileInputFormat.setInputPaths(job, INPUT_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(mapperClass); + job.setReducerClass(AccumulatingReducer.class); + + FileOutputFormat.setOutputPath(job, outputDir); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(JHLAPartitioner.NUM_REDUCERS); + JobClient.runJob(job); + } + + private static class LoggingCollector implements OutputCollector { + public void collect(Text key, Text value) throws IOException { + LOG.info(key + " == " + value); + } + } + + /** + * Run job history log analyser. + */ + public static void main(String[] args) { + Path resFileName = RESULT_FILE; + Configuration conf = new Configuration(); + + try { + conf.setInt("test.io.file.buffer.size", 0); + Path historyDir = DEFAULT_HISTORY_DIR; + String testFile = null; + boolean cleanup = false; + + boolean initControlFiles = true; + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equalsIgnoreCase("-historyDir")) { + historyDir = new Path(args[++i]); + } else if (args[i].equalsIgnoreCase("-resFile")) { + resFileName = new Path(args[++i]); + } else if (args[i].equalsIgnoreCase("-usersIncluded")) { + conf.set("jhla.users.included", args[++i]); + } else if (args[i].equalsIgnoreCase("-usersExcluded")) { + conf.set("jhla.users.excluded", args[++i]); + } else if (args[i].equalsIgnoreCase("-gzip")) { + conf.set("jhla.compression.class", GzipCodec.class.getCanonicalName()); + } else if (args[i].equalsIgnoreCase("-jobDelimiter")) { + conf.set("jhla.job.delimiter.pattern", args[++i]); + } else if (args[i].equalsIgnoreCase("-jobDelimiterLength")) { + conf.setInt("jhla.job.delimiter.length", Integer.parseInt(args[++i])); + } else if(args[i].equalsIgnoreCase("-noInit")) { + initControlFiles = false; + } else if(args[i].equalsIgnoreCase("-test")) { + testFile = args[++i]; + } else if(args[i].equalsIgnoreCase("-clean")) { + cleanup = true; + } else if(args[i].equalsIgnoreCase("-jobQueue")) { + conf.set("mapred.job.queue.name", args[++i]); + } else if(args[i].startsWith("-Xmx")) { + conf.set("mapred.child.java.opts", args[i]); + } else { + printUsage(); + } + } + + if(cleanup) { + cleanup(conf); + return; + } + if(testFile != null) { + LOG.info("Start JHLA test ============ "); + LocalFileSystem lfs = FileSystem.getLocal(conf); + conf.set("fs.default.name", "file:///"); + JHLAMapper map = new JHLAMapper(conf); + map.parseLogFile(lfs, new Path(testFile), 0L, + new LoggingCollector(), Reporter.NULL); + return; + } + + FileSystem fs = FileSystem.get(conf); + if(initControlFiles) + createControlFile(fs, historyDir); + long tStart = System.currentTimeMillis(); + runJHLA(JHLAMapper.class, OUTPUT_DIR, conf); + long execTime = System.currentTimeMillis() - tStart; + + analyzeResult(fs, 0, execTime, resFileName); + } catch(IOException e) { + System.err.print(StringUtils.stringifyException(e)); + System.exit(-1); + } + } + + + private static void printUsage() { + String className = JHLogAnalyzer.class.getSimpleName(); + System.err.println("Usage: " + className + + "\n\t[-historyDir inputDir] | [-resFile resultFile] |" + + "\n\t[-usersIncluded | -usersExcluded userList] |" + + "\n\t[-gzip] | [-jobDelimiter pattern] |" + + "\n\t[-help | -clean | -test testFile]"); + System.exit(-1); + } + + private static Collection getUserList(String users) { + if(users == null) + return null; + StringTokenizer tokens = new StringTokenizer(users, ",;"); + Collection userList = new ArrayList(tokens.countTokens()); + while(tokens.hasMoreTokens()) + userList.add(tokens.nextToken()); + return userList; + } + + /** + * Result is combined from all reduce output files and is written to + * RESULT_FILE in the format + * column 1: + */ + private static void analyzeResult( FileSystem fs, + int testType, + long execTime, + Path resFileName + ) throws IOException { + LOG.info("Analizing results ..."); + DataOutputStream out = null; + BufferedWriter writer = null; + try { + out = new DataOutputStream(fs.create(resFileName)); + writer = new BufferedWriter(new OutputStreamWriter(out)); + writer.write("SERIES\tPERIOD\tTYPE\tSLOT_HOUR\n"); + FileStatus[] reduceFiles = fs.listStatus(OUTPUT_DIR); + assert reduceFiles.length == JHLAPartitioner.NUM_REDUCERS; + for(int i = 0; i < JHLAPartitioner.NUM_REDUCERS; i++) { + DataInputStream in = null; + BufferedReader lines = null; + try { + in = fs.open(reduceFiles[i].getPath()); + lines = new BufferedReader(new InputStreamReader(in)); + + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, "\t*"); + String attr = tokens.nextToken(); + String dateTime = tokens.nextToken(); + String taskType = tokens.nextToken(); + double val = Long.parseLong(tokens.nextToken()) / + (double)DEFAULT_TIME_INTERVAL_MSEC; + writer.write(attr.substring(2)); // skip the stat type "l:" + writer.write("\t"); + writer.write(dateTime); + writer.write("\t"); + writer.write(taskType); + writer.write("\t"); + writer.write(String.valueOf((float)val)); + writer.newLine(); + } + } finally { + if(lines != null) lines.close(); + if(in != null) in.close(); + } + } + } finally { + if(writer != null) writer.close(); + if(out != null) out.close(); + } + LOG.info("Analizing results ... done."); + } + + private static void cleanup(Configuration conf) throws IOException { + LOG.info("Cleaning up test files"); + FileSystem fs = FileSystem.get(conf); + fs.delete(new Path(JHLA_ROOT_DIR), true); + } +} Propchange: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java Wed Jul 8 20:33:04 2009 @@ -78,7 +78,6 @@ private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; - private static Configuration fsConfig = new Configuration(); private static final long MEGA = 0x100000; private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO"); private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); @@ -86,13 +85,18 @@ private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); + static{ + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + } + /** * Run the test with default parameters. * * @throws Exception */ public void testIOs() throws Exception { - testIOs(10, 10); + testIOs(10, 10, new Configuration()); } /** @@ -102,21 +106,21 @@ * @param nrFiles number of files * @throws IOException */ - public static void testIOs(int fileSize, int nrFiles) + public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig) throws IOException { FileSystem fs = FileSystem.get(fsConfig); - createControlFile(fs, fileSize, nrFiles); - writeTest(fs); - readTest(fs); + createControlFile(fs, fileSize, nrFiles, fsConfig); + writeTest(fs, fsConfig); + readTest(fs, fsConfig); cleanup(fs); } - private static void createControlFile( - FileSystem fs, + private static void createControlFile(FileSystem fs, int fileSize, // in MB - int nrFiles + int nrFiles, + Configuration fsConfig ) throws IOException { LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); @@ -158,16 +162,15 @@ *
  • i/o rate squared
  • * */ - private abstract static class IOStatMapper extends IOMapperBase { + private abstract static class IOStatMapper extends IOMapperBase { IOStatMapper() { - super(fsConfig); } void collectStats(OutputCollector output, String name, long execTime, - Object objSize) throws IOException { - long totalSize = ((Long)objSize).longValue(); + Long objSize) throws IOException { + long totalSize = objSize.longValue(); float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); LOG.info("Number of bytes processed = " + totalSize); LOG.info("Exec time = " + execTime); @@ -189,15 +192,14 @@ /** * Write mapper class. */ - public static class WriteMapper extends IOStatMapper { + public static class WriteMapper extends IOStatMapper { public WriteMapper() { - super(); for(int i=0; i < bufferSize; i++) buffer[i] = (byte)('0' + i % 50); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, long totalSize ) throws IOException { @@ -219,22 +221,24 @@ } finally { out.close(); } - return new Long(totalSize); + return Long.valueOf(totalSize); } } - private static void writeTest(FileSystem fs) - throws IOException { + private static void writeTest(FileSystem fs, Configuration fsConfig) + throws IOException { fs.delete(DATA_DIR, true); fs.delete(WRITE_DIR, true); - runIOTest(WriteMapper.class, WRITE_DIR); + runIOTest(WriteMapper.class, WRITE_DIR, fsConfig); } - private static void runIOTest( Class mapperClass, - Path outputDir - ) throws IOException { + @SuppressWarnings("deprecation") + private static void runIOTest( + Class> mapperClass, + Path outputDir, + Configuration fsConfig) throws IOException { JobConf job = new JobConf(fsConfig, TestDFSIO.class); FileInputFormat.setInputPaths(job, CONTROL_DIR); @@ -253,13 +257,12 @@ /** * Read mapper class. */ - public static class ReadMapper extends IOStatMapper { + public static class ReadMapper extends IOStatMapper { public ReadMapper() { - super(); } - public Object doIO(Reporter reporter, + public Long doIO(Reporter reporter, String name, long totalSize ) throws IOException { @@ -278,22 +281,22 @@ } finally { in.close(); } - return new Long(totalSize); + return Long.valueOf(totalSize); } } - private static void readTest(FileSystem fs) throws IOException { + private static void readTest(FileSystem fs, Configuration fsConfig) + throws IOException { fs.delete(READ_DIR, true); - runIOTest(ReadMapper.class, READ_DIR); + runIOTest(ReadMapper.class, READ_DIR, fsConfig); } - private static void sequentialTest( - FileSystem fs, + private static void sequentialTest(FileSystem fs, int testType, int fileSize, int nrFiles ) throws Exception { - IOStatMapper ioer = null; + IOStatMapper ioer = null; if (testType == TEST_TYPE_READ) ioer = new ReadMapper(); else if (testType == TEST_TYPE_WRITE) @@ -348,6 +351,7 @@ LOG.info("bufferSize = " + bufferSize); try { + Configuration fsConfig = new Configuration(); fsConfig.setInt("test.io.file.buffer.size", bufferSize); FileSystem fs = FileSystem.get(fsConfig); @@ -363,12 +367,12 @@ cleanup(fs); return; } - createControlFile(fs, fileSize, nrFiles); + createControlFile(fs, fileSize, nrFiles, fsConfig); long tStart = System.currentTimeMillis(); if (testType == TEST_TYPE_WRITE) - writeTest(fs); + writeTest(fs, fsConfig); if (testType == TEST_TYPE_READ) - readTest(fs); + readTest(fs, fsConfig); long execTime = System.currentTimeMillis() - tStart; analyzeResult(fs, testType, execTime, resFileName); @@ -388,30 +392,34 @@ reduceFile = new Path(WRITE_DIR, "part-00000"); else reduceFile = new Path(READ_DIR, "part-00000"); - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); - - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); long tasks = 0; long size = 0; long time = 0; float rate = 0; float sqrate = 0; - String line; - while((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); - String attr = tokens.nextToken(); - if (attr.endsWith(":tasks")) - tasks = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":size")) - size = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":time")) - time = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":rate")) - rate = Float.parseFloat(tokens.nextToken()); - else if (attr.endsWith(":sqrate")) - sqrate = Float.parseFloat(tokens.nextToken()); + DataInputStream in = null; + BufferedReader lines = null; + try { + in = new DataInputStream(fs.open(reduceFile)); + lines = new BufferedReader(new InputStreamReader(in)); + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); + String attr = tokens.nextToken(); + if (attr.endsWith(":tasks")) + tasks = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":size")) + size = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":time")) + time = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":rate")) + rate = Float.parseFloat(tokens.nextToken()); + else if (attr.endsWith(":sqrate")) + sqrate = Float.parseFloat(tokens.nextToken()); + } + } finally { + if(in != null) in.close(); + if(lines != null) lines.close(); } double med = rate / 1000 / tasks; @@ -429,12 +437,15 @@ " Test exec time sec: " + (float)execTime / 1000, "" }; - PrintStream res = new PrintStream( - new FileOutputStream( - new File(resFileName), true)); - for(int i = 0; i < resultLines.length; i++) { - LOG.info(resultLines[i]); - res.println(resultLines[i]); + PrintStream res = null; + try { + res = new PrintStream(new FileOutputStream(new File(resFileName), true)); + for(int i = 0; i < resultLines.length; i++) { + LOG.info(resultLines[i]); + res.println(resultLines[i]); + } + } finally { + if(res != null) res.close(); } } Added: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java?rev=792299&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java Wed Jul 8 20:33:04 2009 @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.File; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; + +/** + * Test Job History Log Analyzer. + * + * @see JHLogAnalyzer + */ +public class TestJHLA extends TestCase { + private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class); + private String historyLog = System.getProperty("test.build.data", + "build/test/data") + "/history/test.log"; + + @Before + public void setUp() throws Exception { + File logFile = new File(historyLog); + if(!logFile.getParentFile().exists()) + if(!logFile.getParentFile().mkdirs()) + LOG.error("Cannot create dirs for history log file: " + historyLog); + if(!logFile.createNewFile()) + LOG.error("Cannot create history log file: " + historyLog); + BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(historyLog))); + writer.write("$!!FILE=file1.log!!"); writer.newLine(); + writer.write("Meta VERSION=\"1\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0004\" JOBNAME=\"streamjob21364.jar\" USER=\"hadoop\" SUBMIT_TIME=\"1237962008012\" JOBCONF=\"hdfs:///job_200903250600_0004/job.xml\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0004\" JOB_PRIORITY=\"NORMAL\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0004\" LAUNCH_TIME=\"1237962008712\" TOTAL_MAPS=\"2\" TOTAL_REDUCES=\"0\" JOB_STATUS=\"PREP\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000003\" TASK_TYPE=\"SETUP\" START_TIME=\"1237962008736\" SPLITS=\"\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0004_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000003_0\" START_TIME=\"1237962010929\" TRACKER_NAME=\"tracker_50445\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0004_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000003_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962012459\" HOSTNAME=\"host.com\" STATE_STRING=\"setup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000003\" TASK_TYPE=\"SETUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962023824\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0004\" JOB_STATUS=\"RUNNING\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1237962024049\" SPLITS=\"host1.com,host2.com,host3.com\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000001\" TASK_TYPE=\"MAP\" START_TIME=\"1237962024065\" SPLITS=\"host1.com,host2.com,host3.com\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000000_0\" START_TIME=\"1237962026157\" TRACKER_NAME=\"tracker_50524\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000000_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962041307\" HOSTNAME=\"host.com\" STATE_STRING=\"Records R/W=2681/1\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(56630)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28327)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2681)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28327)][(MAP_OUTPUT_RECORDS)(Map output records)(2681)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000000\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962054138\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(56630)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28327)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2681)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28327)][(MAP_OUTPUT_RECORDS)(Map output records)(2681)]}\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000001_0\" START_TIME=\"1237962026077\" TRACKER_NAME=\"tracker_50162\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000001_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962041030\" HOSTNAME=\"host.com\" STATE_STRING=\"Records R/W=2634/1\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(28316)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28303)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2634)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28303)][(MAP_OUTPUT_RECORDS)(Map output records)(2634)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000001\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962054187\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(28316)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28303)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2634)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28303)][(MAP_OUTPUT_RECORDS)(Map output records)(2634)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000002\" TASK_TYPE=\"CLEANUP\" START_TIME=\"1237962054187\" SPLITS=\"\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0004_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000002_0\" START_TIME=\"1237962055578\" TRACKER_NAME=\"tracker_50162\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0004_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000002_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962056782\" HOSTNAME=\"host.com\" STATE_STRING=\"cleanup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0004_m_000002\" TASK_TYPE=\"CLEANUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962069193\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0004\" FINISH_TIME=\"1237962069193\" JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"2\" FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\" FAILED_REDUCES=\"0\" COUNTERS=\"{(org.apache.hadoop.mapred.JobInProgress$Counter)(Job Counters )[(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(2)]}{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(84946)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(56630)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5315)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(56630)][(MAP_OUTPUT_RECORDS)(Map output records)(5315)]}\" ."); writer.newLine(); + writer.write("$!!FILE=file2.log!!"); writer.newLine(); + writer.write("Meta VERSION=\"1\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0023\" JOBNAME=\"TestJob\" USER=\"hadoop2\" SUBMIT_TIME=\"1237964779799\" JOBCONF=\"hdfs:///job_200903250600_0023/job.xml\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0023\" JOB_PRIORITY=\"NORMAL\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0023\" LAUNCH_TIME=\"1237964780928\" TOTAL_MAPS=\"2\" TOTAL_REDUCES=\"0\" JOB_STATUS=\"PREP\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_r_000001\" TASK_TYPE=\"SETUP\" START_TIME=\"1237964780940\" SPLITS=\"\" ."); writer.newLine(); + writer.write("ReduceAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0023_r_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000001_0\" START_TIME=\"1237964720322\" TRACKER_NAME=\"tracker_3065\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("ReduceAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0023_r_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000001_0\" TASK_STATUS=\"SUCCESS\" SHUFFLE_FINISHED=\"1237964722118\" SORT_FINISHED=\"1237964722118\" FINISH_TIME=\"1237964722118\" HOSTNAME=\"host.com\" STATE_STRING=\"setup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_r_000001\" TASK_TYPE=\"SETUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964796054\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0023\" JOB_STATUS=\"RUNNING\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1237964796176\" SPLITS=\"\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_m_000001\" TASK_TYPE=\"MAP\" START_TIME=\"1237964796176\" SPLITS=\"\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000000_0\" START_TIME=\"1237964809765\" TRACKER_NAME=\"tracker_50459\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000000_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964911772\" HOSTNAME=\"host.com\" STATE_STRING=\"\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_m_000000\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964916534\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000001_0\" START_TIME=\"1237964798169\" TRACKER_NAME=\"tracker_1524\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000001_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964962960\" HOSTNAME=\"host.com\" STATE_STRING=\"\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_m_000001\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964976870\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_r_000000\" TASK_TYPE=\"CLEANUP\" START_TIME=\"1237964976871\" SPLITS=\"\" ."); writer.newLine(); + writer.write("ReduceAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0023_r_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000000_0\" START_TIME=\"1237964977208\" TRACKER_NAME=\"tracker_1524\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("ReduceAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0023_r_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000000_0\" TASK_STATUS=\"SUCCESS\" SHUFFLE_FINISHED=\"1237964979031\" SORT_FINISHED=\"1237964979031\" FINISH_TIME=\"1237964979032\" HOSTNAME=\"host.com\" STATE_STRING=\"cleanup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0023_r_000000\" TASK_TYPE=\"CLEANUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964991879\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0023\" FINISH_TIME=\"1237964991879\" JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"2\" FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\" FAILED_REDUCES=\"0\" COUNTERS=\"{(org.apache.hadoop.mapred.JobInProgress$Counter)(Job Counters )[(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(2)]}{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(1000000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(10000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(10000000)][(MAP_OUTPUT_RECORDS)(Map output records)(10000000)]}\" ."); writer.newLine(); + writer.write("$!!FILE=file3.log!!"); writer.newLine(); + writer.write("Meta VERSION=\"1\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0034\" JOBNAME=\"TestJob\" USER=\"hadoop3\" SUBMIT_TIME=\"1237966370007\" JOBCONF=\"hdfs:///job_200903250600_0034/job.xml\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0034\" JOB_PRIORITY=\"NORMAL\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0034\" LAUNCH_TIME=\"1237966371076\" TOTAL_MAPS=\"2\" TOTAL_REDUCES=\"0\" JOB_STATUS=\"PREP\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0034_m_000003\" TASK_TYPE=\"SETUP\" START_TIME=\"1237966371093\" SPLITS=\"\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0034_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000003_0\" START_TIME=\"1237966371524\" TRACKER_NAME=\"tracker_50118\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0034_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000003_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237966373174\" HOSTNAME=\"host.com\" STATE_STRING=\"setup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0034_m_000003\" TASK_TYPE=\"SETUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237966386098\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0034\" JOB_STATUS=\"RUNNING\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0034_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1237966386111\" SPLITS=\"\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0034_m_000001\" TASK_TYPE=\"MAP\" START_TIME=\"1237966386124\" SPLITS=\"\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0034_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000001_0\" TASK_STATUS=\"FAILED\" FINISH_TIME=\"1237967174546\" HOSTNAME=\"host.com\" ERROR=\"java.io.IOException: Task process exit with nonzero status of 15."); writer.newLine(); + writer.write(" at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:424)"); writer.newLine(); + writer.write(",java.io.IOException: Task process exit with nonzero status of 15."); writer.newLine(); + writer.write(" at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:424)"); writer.newLine(); + writer.write("\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0034_m_000002\" TASK_TYPE=\"CLEANUP\" START_TIME=\"1237967170815\" SPLITS=\"\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0034_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000002_0\" START_TIME=\"1237967168653\" TRACKER_NAME=\"tracker_3105\" HTTP_PORT=\"50060\" ."); writer.newLine(); + writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0034_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000002_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237967171301\" HOSTNAME=\"host.com\" STATE_STRING=\"cleanup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Task TASKID=\"task_200903250600_0034_m_000002\" TASK_TYPE=\"CLEANUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237967185818\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine(); + writer.write("Job JOBID=\"job_200903250600_0034\" FINISH_TIME=\"1237967185818\" JOB_STATUS=\"KILLED\" FINISHED_MAPS=\"0\" FINISHED_REDUCES=\"0\" ."); writer.newLine(); + writer.close(); + } + + @After + public void tearDown() throws Exception { + File logFile = new File(historyLog); + if(!logFile.delete()) + LOG.error("Cannot delete history log file: " + historyLog); + if(!logFile.getParentFile().delete()) + LOG.error("Cannot delete history log dir: " + historyLog); + } + + /** + * Run log analyzer in test mode for file test.log. + */ + public void testJHLA() { + String[] args = {"-test", historyLog, "-jobDelimiter", ".!!FILE=.*!!"}; + JHLogAnalyzer.main(args); + args = new String[]{"-test", historyLog, "-jobDelimiter", ".!!FILE=.*!!", + "-usersIncluded", "hadoop,hadoop2"}; + JHLogAnalyzer.main(args); + args = new String[]{"-test", historyLog, "-jobDelimiter", ".!!FILE=.*!!", + "-usersExcluded", "hadoop,hadoop3"}; + JHLogAnalyzer.main(args); + } +} Propchange: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java?rev=792299&r1=792298&r2=792299&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java Wed Jul 8 20:33:04 2009 @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.DistributedFSCheck; import org.apache.hadoop.fs.TestDFSIO; import org.apache.hadoop.fs.TestFileSystem; +import org.apache.hadoop.fs.JHLogAnalyzer; import org.apache.hadoop.hdfs.NNBench; import org.apache.hadoop.io.FileBench; import org.apache.hadoop.util.ProgramDriver; -/* +/** * Driver for HDFS tests, which require map-reduce to run. */ public class HdfsWithMRTestDriver { @@ -55,6 +56,8 @@ "Benchmark SequenceFile(Input|Output)Format " + "(block,record compressed and uncompressed), " + "Text(Input|Output)Format (compressed and uncompressed)"); + pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class, + "Job History Log analyzer."); } catch(Throwable e) { e.printStackTrace(); }