hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r792299 - in /hadoop/hdfs/trunk: ./ src/test/hdfs-with-mr/org/apache/hadoop/fs/ src/test/hdfs-with-mr/org/apache/hadoop/test/
Date Wed, 08 Jul 2009 20:33:04 GMT
Author: shv
Date: Wed Jul  8 20:33:04 2009
New Revision: 792299

URL: http://svn.apache.org/viewvc?rev=792299&view=rev
Log:
HDFS-459. Introduce Job History Log Analyzer. Contributed by Konstantin Shvachko.

Added:
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java   (with props)
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java   (with props)
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java
    hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Jul  8 20:33:04 2009
@@ -9,6 +9,8 @@
 
     HDFS-447. Add LDAP lookup to hdfsproxy. (Zhiyong Zhang via cdouglas)
 
+    HDFS-459. Introduce Job History Log Analyzer. (shv)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java Wed Jul  8 20:33:04 2009
@@ -23,10 +23,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
 
 /**
  * Reducer that accumulates values based on their type.
@@ -44,6 +41,7 @@
  * </ul>
  * 
  */
+@SuppressWarnings("deprecation")
 public class AccumulatingReducer extends MapReduceBase
     implements Reducer<Text, Text, Text, Text> {
   static final String VALUE_TYPE_LONG = "l:";
@@ -74,10 +72,10 @@
 
     // concatenate strings
     if (field.startsWith(VALUE_TYPE_STRING)) {
-      String sSum = "";
+      StringBuffer sSum = new StringBuffer();
       while (values.hasNext())
-        sSum += values.next().toString() + ";";
-      output.collect(key, new Text(sSum));
+        sSum.append(values.next().toString()).append(";");
+      output.collect(key, new Text(sSum.toString()));
       reporter.setStatus("finished " + field + " ::host = " + hostName);
       return;
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java Wed Jul  8 20:33:04 2009
@@ -162,16 +162,15 @@
    * <li>i/o rate squared</li>
    * </ul>
    */
-  private abstract static class IOStatMapper extends IOMapperBase {
+  private abstract static class IOStatMapper extends IOMapperBase<Long> {
     IOStatMapper() { 
-      super(fsConfig);
     }
     
     void collectStats(OutputCollector<Text, Text> output, 
                       String name,
                       long execTime, 
-                      Object objSize) throws IOException {
-      long totalSize = ((Long)objSize).longValue();
+                      Long objSize) throws IOException {
+      long totalSize = objSize.longValue();
       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
       LOG.info("Number of bytes processed = " + totalSize);
       LOG.info("Exec time = " + execTime);
@@ -201,7 +200,7 @@
         buffer[i] = (byte)('0' + i % 50);
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize 
                        ) throws IOException {
@@ -302,7 +301,7 @@
       super(); 
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize 
                        ) throws IOException {

Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java Wed Jul  8 20:33:04 2009
@@ -142,10 +142,9 @@
   /**
    * DistributedFSCheck mapper class.
    */
-  public static class DistributedFSCheckMapper extends IOMapperBase {
+  public static class DistributedFSCheckMapper extends IOMapperBase<Object> {
 
     public DistributedFSCheckMapper() { 
-      super(fsConfig); 
     }
 
     public Object doIO(Reporter reporter, 

Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java Wed Jul  8 20:33:04 2009
@@ -19,14 +19,10 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
 
 /**
  * Base mapper class for IO operations.
@@ -37,7 +33,8 @@
  * statistics data to be collected by subsequent reducers.
  * 
  */
-public abstract class IOMapperBase extends Configured
+@SuppressWarnings("deprecation")
+public abstract class IOMapperBase<T> extends Configured
     implements Mapper<Text, LongWritable, Text, Text> {
   
   protected byte[] buffer;
@@ -45,8 +42,11 @@
   protected FileSystem fs;
   protected String hostName;
 
-  public IOMapperBase(Configuration conf) { 
-    super(conf); 
+  public IOMapperBase() { 
+  }
+
+  public void configure(JobConf conf) {
+    setConf(conf);
     try {
       fs = FileSystem.get(conf);
     } catch (Exception e) {
@@ -61,10 +61,6 @@
     }
   }
 
-  public void configure(JobConf job) {
-    setConf(job);
-  }
-
   public void close() throws IOException {
   }
   
@@ -78,7 +74,7 @@
    *          {@link #collectStats(OutputCollector,String,long,Object)}
    * @throws IOException
    */
-  abstract Object doIO(Reporter reporter, 
+  abstract T doIO(Reporter reporter, 
                        String name, 
                        long value) throws IOException;
 
@@ -94,7 +90,7 @@
   abstract void collectStats(OutputCollector<Text, Text> output, 
                              String name, 
                              long execTime, 
-                             Object doIOReturnValue) throws IOException;
+                             T doIOReturnValue) throws IOException;
   
   /**
    * Map file name and offset into statistical data.
@@ -119,7 +115,7 @@
     reporter.setStatus("starting " + name + " ::host = " + hostName);
     
     long tStart = System.currentTimeMillis();
-    Object statValue = doIO(reporter, name, longValue);
+    T statValue = doIO(reporter, name, longValue);
     long tEnd = System.currentTimeMillis();
     long execTime = tEnd - tStart;
     collectStats(output, name, execTime, statValue);

Added: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java?rev=792299&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java Wed Jul  8 20:33:04 2009
@@ -0,0 +1,1129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Job History Log Analyzer.
+ * 
+ * <h3>Description.</h3>
+ * This a tool for parsing and analyzing history logs of map-reduce jobs.
+ * History logs contain information about execution of jobs, tasks, and 
+ * attempts. This tool focuses on submission, launch, start, and finish times,
+ * as well as the success or failure of jobs, tasks, and attempts.
+ * <p>
+ * The analyzer calculates <em>per hour slot utilization</em> for the cluster 
+ * as follows.
+ * For each task attempt it divides the time segment from the start of the 
+ * attempt t<sub>S</sub> to the finish t<sub>F</sub> into whole hours 
+ * [t<sub>0</sub>, ..., t<sub>n</sub>], where t<sub>0</sub> <= t<sub>S</sub> 
+ * is the maximal whole hour preceding t<sub>S</sub>, and
+ * t<sub>n</sub> >= t<sub>F</sub> is the minimal whole hour after t<sub>F</sub>. 
+ * Thus, [t<sub>0</sub>, ..., t<sub>n</sub>] covers the segment 
+ * [t<sub>S</sub>, t<sub>F</sub>], during which the attempt was executed.
+ * Each interval [t<sub>i</sub>, t<sub>i+1</sub>] fully contained in 
+ * [t<sub>S</sub>, t<sub>F</sub>] corresponds to exactly one slot on
+ * a map-reduce cluster (usually MAP-slot or REDUCE-slot).
+ * If interval [t<sub>i</sub>, t<sub>i+1</sub>] only intersects with 
+ * [t<sub>S</sub>, t<sub>F</sub>] then we say that the task 
+ * attempt used just a fraction of the slot during this hour.
+ * The fraction equals the size of the intersection.
+ * Let slotTime(A, h) denote the number of slots calculated that way for a 
+ * specific attempt A during hour h.
+ * The tool then sums all slots for all attempts for every hour.
+ * The result is the slot hour utilization of the cluster:
+ * <tt>slotTime(h) = SUM<sub>A</sub> slotTime(A,h)</tt>.
+ * <p>
+ * Log analyzer calculates slot hours for <em>MAP</em> and <em>REDUCE</em> 
+ * attempts separately.
+ * <p>
+ * Log analyzer distinguishes between <em>successful</em> and <em>failed</em>
+ * attempts. Task attempt is considered successful if its own status is SUCCESS
+ * and the statuses of the task and the job it is a part of are also SUCCESS.
+ * Otherwise the task attempt is considered failed.
+ * <p>
+ * Map-reduce clusters are usually configured to have a fixed number of MAP 
+ * and REDUCE slots per node. Thus the maximal possible number of slots on
+ * the cluster is <tt>total_slots = total_nodes * slots_per_node</tt>.
+ * Effective slot hour cannot exceed <tt>total_slots</tt> for successful
+ * attempts.
+ * <p>
+ * <em>Pending time</em> characterizes the wait time of attempts.
+ * It is calculated similarly to the slot hour except that the wait interval
+ * starts when the job is submitted and ends when an attempt starts execution.
+ * In addition to that pending time also includes intervals between attempts
+ * of the same task if it was re-executed.
+ * <p>
+ * History log analyzer calculates two pending time variations. First is based
+ * on job submission time as described above, second, starts the wait interval
+ * when the job is launched rather than submitted.
+ * 
+ * <h3>Input.</h3>
+ * The following input parameters can be specified in the argument string
+ * to the job log analyzer:
+ * <ul>
+ * <li><tt>-historyDir inputDir</tt> specifies the location of the directory
+ * where analyzer will be looking for job history log files.</li>
+ * <li><tt>-resFile resultFile</tt> the name of the result file.</li>
+ * <li><tt>-usersIncluded | -usersExcluded userList</tt> slot utilization and 
+ * pending time can be calculated for all or for all but the specified users.
+ * <br>
+ * <tt>userList</tt> is a comma or semicolon separated list of users.</li>
+ * <li><tt>-gzip</tt> is used if history log files are compressed.
+ * Only {@link GzipCodec} is currently supported.</li>
+ * <li><tt>-jobDelimiter pattern</tt> one can concatenate original log files into 
+ * larger file(s) with the specified delimiter to recognize the end of the log
+ * for one job from the next one.<br>
+ * <tt>pattern</tt> is a java regular expression
+ * {@link java.util.regex.Pattern}, which should match only the log delimiters.
+ * <br>
+ * E.g. pattern <tt>".!!FILE=.*!!"</tt> matches delimiters, which contain
+ * the original history log file names in the following form:<br>
+ * <tt>"$!!FILE=my.job.tracker.com_myJobId_user_wordcount.log!!"</tt></li>
+ * <li><tt>-clean</tt> cleans up default directories used by the analyzer.</li>
+ * <li><tt>-test</tt> test one file locally and exit;
+ * does not require map-reduce.</li>
+ * <li><tt>-help</tt> print usage.</li>
+ * </ul>
+ * 
+ * <h3>Output.</h3>
+ * The output file is formatted as a tab separated table consisting of four
+ * columns: <tt>SERIES, PERIOD, TYPE, SLOT_HOUR</tt>.
+ * <ul>
+ * <li><tt>SERIES</tt> one of the four statistical series;</li>
+ * <li><tt>PERIOD</tt> the start of the time interval in the following format:
+ * <tt>"yyyy-mm-dd hh:mm:ss"</tt>;</li>
+ * <li><tt>TYPE</tt> the slot type, e.g. MAP or REDUCE;</li>
+ * <li><tt>SLOT_HOUR</tt> the value of the slot usage during this 
+ * time interval.</li>
+ * </ul>
+ */
+@SuppressWarnings("deprecation")
+public class JHLogAnalyzer {
+  private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class);
+  // Constants
+  private static final String JHLA_ROOT_DIR = 
+                            System.getProperty("test.build.data", "stats/JHLA");
+  private static final Path INPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_input");
+  private static final String BASE_INPUT_FILE_NAME = "jhla_in_";
+  private static final Path OUTPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_output");
+  private static final Path RESULT_FILE = 
+                            new Path(JHLA_ROOT_DIR, "jhla_result.txt");
+  private static final Path DEFAULT_HISTORY_DIR = new Path("history");
+
+  private static final int DEFAULT_TIME_INTERVAL_MSEC = 1000*60*60; // 1 hour
+
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
+
+  static enum StatSeries {
+    STAT_ALL_SLOT_TIME
+          (AccumulatingReducer.VALUE_TYPE_LONG + "allSlotTime"),
+    STAT_FAILED_SLOT_TIME
+          (AccumulatingReducer.VALUE_TYPE_LONG + "failedSlotTime"),
+    STAT_SUBMIT_PENDING_SLOT_TIME
+          (AccumulatingReducer.VALUE_TYPE_LONG + "submitPendingSlotTime"),
+    STAT_LAUNCHED_PENDING_SLOT_TIME
+          (AccumulatingReducer.VALUE_TYPE_LONG + "launchedPendingSlotTime");    
+
+    private String statName = null;
+    private StatSeries(String name) {this.statName = name;}
+    public String toString() {return statName;}
+  }
+
+  private static class FileCreateDaemon extends Thread {
+    private static final int NUM_CREATE_THREADS = 10;
+    private static volatile int numFinishedThreads;
+    private static volatile int numRunningThreads;
+    private static FileStatus[] jhLogFiles;
+
+    FileSystem fs;
+    int start;
+    int end;
+
+    FileCreateDaemon(FileSystem fs, int start, int end) {
+      this.fs = fs;
+      this.start = start;
+      this.end = end;
+    }
+
+    public void run() {
+      try {
+        for(int i=start; i < end; i++) {
+          String name = getFileName(i);
+          Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
+          SequenceFile.Writer writer = null;
+          try {
+            writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
+                                               Text.class, LongWritable.class,
+                                               CompressionType.NONE);
+            String logFile = jhLogFiles[i].getPath().toString();
+            writer.append(new Text(logFile), new LongWritable(0));
+          } catch(Exception e) {
+            throw new IOException(e);
+          } finally {
+            if (writer != null)
+              writer.close();
+            writer = null;
+          }
+        }
+      } catch(IOException ex) {
+        LOG.error("FileCreateDaemon failed.", ex);
+      }
+      numFinishedThreads++;
+    }
+
+    private static void createControlFile(FileSystem fs, Path jhLogDir
+    ) throws IOException {
+      fs.delete(INPUT_DIR, true);
+      jhLogFiles = fs.listStatus(jhLogDir);
+
+      numFinishedThreads = 0;
+      try {
+        int start = 0;
+        int step = jhLogFiles.length / NUM_CREATE_THREADS
+        + ((jhLogFiles.length % NUM_CREATE_THREADS) > 0 ? 1 : 0);
+        FileCreateDaemon[] daemons = new FileCreateDaemon[NUM_CREATE_THREADS];
+        numRunningThreads = 0;
+        for(int tIdx=0; tIdx < NUM_CREATE_THREADS && start < jhLogFiles.length; tIdx++) {
+          int end = Math.min(start + step, jhLogFiles.length);
+          daemons[tIdx] = new FileCreateDaemon(fs, start, end);
+          start += step;
+          numRunningThreads++;
+        }
+        for(int tIdx=0; tIdx < numRunningThreads; tIdx++) {
+          daemons[tIdx].start();
+        }
+      } finally {
+        int prevValue = 0;
+        while(numFinishedThreads < numRunningThreads) {
+          if(prevValue < numFinishedThreads) {
+            LOG.info("Finished " + numFinishedThreads + " threads out of " + numRunningThreads);
+            prevValue = numFinishedThreads;
+          }
+          try {Thread.sleep(500);} catch (InterruptedException e) {}
+        }
+      }
+    }
+  }
+
+  private static void createControlFile(FileSystem fs, Path jhLogDir
+  ) throws IOException {
+    LOG.info("creating control file: JH log dir = " + jhLogDir);
+    FileCreateDaemon.createControlFile(fs, jhLogDir);
+    LOG.info("created control file: JH log dir = " + jhLogDir);
+  }
+
+  private static String getFileName(int fIdx) {
+    return BASE_INPUT_FILE_NAME + Integer.toString(fIdx);
+  }
+
+  /**
+   * If keyVal is of the form KEY="VALUE", then this will return [KEY, VALUE]
+   */
+  private static String [] getKeyValue(String t) throws IOException {
+    String[] keyVal = t.split("=\"*|\"");
+    return keyVal;
+  }
+
+  /**
+   * JobHistory log record.
+   */
+  private static class JobHistoryLog {
+    String JOBID;
+    String JOB_STATUS;
+    long SUBMIT_TIME;
+    long LAUNCH_TIME;
+    long FINISH_TIME;
+    long TOTAL_MAPS;
+    long TOTAL_REDUCES;
+    long FINISHED_MAPS;
+    long FINISHED_REDUCES;
+    String USER;
+    Map<String, TaskHistoryLog> tasks;
+
+    boolean isSuccessful() {
+     return (JOB_STATUS != null) && JOB_STATUS.equals("SUCCESS");
+    }
+
+    void parseLine(String line) throws IOException {
+      StringTokenizer tokens = new StringTokenizer(line);
+      if(!tokens.hasMoreTokens())
+        return;
+      String what = tokens.nextToken();
+      // Line should start with one of the following:
+      // Job, Task, MapAttempt, ReduceAttempt
+      if(what.equals("Job"))
+        updateJob(tokens);
+      else if(what.equals("Task"))
+        updateTask(tokens);
+      else if(what.indexOf("Attempt") >= 0)
+        updateTaskAttempt(tokens);
+    }
+
+    private void updateJob(StringTokenizer tokens) throws IOException {
+      while(tokens.hasMoreTokens()) {
+        String t = tokens.nextToken();
+        String[] keyVal = getKeyValue(t);
+        if(keyVal.length < 2) continue;
+
+        if(keyVal[0].equals("JOBID")) {
+          if(JOBID == null)
+            JOBID = new String(keyVal[1]);
+          else if(!JOBID.equals(keyVal[1])) {
+            LOG.error("Incorrect JOBID: "
+                + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) 
+                + " expect " + JOBID);
+            return;
+          }
+        }
+        else if(keyVal[0].equals("JOB_STATUS"))
+          JOB_STATUS = new String(keyVal[1]);
+        else if(keyVal[0].equals("SUBMIT_TIME"))
+          SUBMIT_TIME = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("LAUNCH_TIME"))
+          LAUNCH_TIME = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("FINISH_TIME"))
+          FINISH_TIME = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("TOTAL_MAPS"))
+          TOTAL_MAPS = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("TOTAL_REDUCES"))
+          TOTAL_REDUCES = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("FINISHED_MAPS"))
+          FINISHED_MAPS = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("FINISHED_REDUCES"))
+          FINISHED_REDUCES = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("USER"))
+          USER = new String(keyVal[1]);
+      }
+    }
+
+    private void updateTask(StringTokenizer tokens) throws IOException {
+      // unpack
+      TaskHistoryLog task = new TaskHistoryLog().parse(tokens);
+      if(task.TASKID == null) {
+        LOG.error("TASKID = NULL for job " + JOBID);
+        return;
+      }
+      // update or insert
+      if(tasks == null)
+        tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES));
+      TaskHistoryLog existing = tasks.get(task.TASKID);
+      if(existing == null)
+        tasks.put(task.TASKID, task);
+      else
+        existing.updateWith(task);
+    }
+
+    private void updateTaskAttempt(StringTokenizer tokens) throws IOException {
+      // unpack
+      TaskAttemptHistoryLog attempt = new TaskAttemptHistoryLog();
+      String taskID = attempt.parse(tokens);
+      if(taskID == null) return;
+      if(tasks == null)
+        tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES));
+      TaskHistoryLog existing = tasks.get(taskID);
+      if(existing == null) {
+        existing = new TaskHistoryLog(taskID);
+        tasks.put(taskID, existing);
+      }
+      existing.updateWith(attempt);
+    }
+  }
+
+  /**
+   * TaskHistory log record.
+   */
+  private static class TaskHistoryLog {
+    String TASKID;
+    String TASK_TYPE;   // MAP, REDUCE, SETUP, CLEANUP
+    String TASK_STATUS;
+    long START_TIME;
+    long FINISH_TIME;
+    Map<String, TaskAttemptHistoryLog> attempts;
+
+    TaskHistoryLog() {}
+
+    TaskHistoryLog(String taskID) {
+      TASKID = taskID;
+    }
+
+    boolean isSuccessful() {
+      return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS");
+    }
+
+    TaskHistoryLog parse(StringTokenizer tokens) throws IOException {
+      while(tokens.hasMoreTokens()) {
+        String t = tokens.nextToken();
+        String[] keyVal = getKeyValue(t);
+        if(keyVal.length < 2) continue;
+
+        if(keyVal[0].equals("TASKID")) {
+          if(TASKID == null)
+            TASKID = new String(keyVal[1]);
+          else if(!TASKID.equals(keyVal[1])) {
+            LOG.error("Incorrect TASKID: "
+                + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) 
+                + " expect " + TASKID);
+            continue;
+          }
+        }
+        else if(keyVal[0].equals("TASK_TYPE"))
+          TASK_TYPE = new String(keyVal[1]);
+        else if(keyVal[0].equals("TASK_STATUS"))
+          TASK_STATUS = new String(keyVal[1]);
+        else if(keyVal[0].equals("START_TIME"))
+          START_TIME = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("FINISH_TIME"))
+          FINISH_TIME = Long.parseLong(keyVal[1]);
+      }
+      return this;
+    }
+
+    /**
+     * Update with non-null fields of the same task log record.
+     */
+    void updateWith(TaskHistoryLog from) throws IOException {
+      if(TASKID == null)
+        TASKID = from.TASKID;
+      else if(!TASKID.equals(from.TASKID)) {
+        throw new IOException("Incorrect TASKID: " + from.TASKID
+                            + " expect " + TASKID);
+      }
+      if(TASK_TYPE == null)
+        TASK_TYPE = from.TASK_TYPE;
+      else if(! TASK_TYPE.equals(from.TASK_TYPE)) {
+        LOG.error(
+            "Incorrect TASK_TYPE: " + from.TASK_TYPE + " expect " + TASK_TYPE
+            + " for task " + TASKID);
+        return;
+      }
+      if(from.TASK_STATUS != null)
+        TASK_STATUS = from.TASK_STATUS;
+      if(from.START_TIME > 0)
+        START_TIME = from.START_TIME;
+      if(from.FINISH_TIME > 0)
+        FINISH_TIME = from.FINISH_TIME;
+    }
+
+    /**
+     * Update with non-null fields of the task attempt log record.
+     */
+    void updateWith(TaskAttemptHistoryLog attempt) throws IOException {
+      if(attempt.TASK_ATTEMPT_ID == null) {
+        LOG.error("Unexpected TASK_ATTEMPT_ID = null for task " + TASKID);
+        return;
+      }
+      if(attempts == null)
+        attempts = new HashMap<String, TaskAttemptHistoryLog>();
+      TaskAttemptHistoryLog existing = attempts.get(attempt.TASK_ATTEMPT_ID);
+      if(existing == null)
+        attempts.put(attempt.TASK_ATTEMPT_ID, attempt);
+      else
+        existing.updateWith(attempt);
+      // update task start time
+      if(attempt.START_TIME > 0 && 
+          (this.START_TIME == 0 || this.START_TIME > attempt.START_TIME))
+        START_TIME = attempt.START_TIME;
+    }
+  }
+
+  /**
+   * TaskAttemptHistory log record.
+   */
+  private static class TaskAttemptHistoryLog {
+    String TASK_ATTEMPT_ID;
+    String TASK_STATUS; // this task attempt status
+    long START_TIME;
+    long FINISH_TIME;
+    long HDFS_BYTES_READ;
+    long HDFS_BYTES_WRITTEN;
+    long FILE_BYTES_READ;
+    long FILE_BYTES_WRITTEN;
+
+    /**
+     * Task attempt is considered successful iff all three statuses
+     * of the attempt, the task, and the job equal "SUCCESS".
+     */
+    boolean isSuccessful() {
+      return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS");
+    }
+
+    String parse(StringTokenizer tokens) throws IOException {
+      String taskID = null;
+      while(tokens.hasMoreTokens()) {
+        String t = tokens.nextToken();
+        String[] keyVal = getKeyValue(t);
+        if(keyVal.length < 2) continue;
+
+        if(keyVal[0].equals("TASKID")) {
+          if(taskID == null)
+            taskID = new String(keyVal[1]);
+          else if(!taskID.equals(keyVal[1])) {
+            LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID);
+            continue;
+          }
+        }
+        else if(keyVal[0].equals("TASK_ATTEMPT_ID")) {
+          if(TASK_ATTEMPT_ID == null)
+            TASK_ATTEMPT_ID = new String(keyVal[1]);
+          else if(!TASK_ATTEMPT_ID.equals(keyVal[1])) {
+            LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID);
+            continue;
+          }
+        }
+        else if(keyVal[0].equals("TASK_STATUS"))
+          TASK_STATUS = new String(keyVal[1]);
+        else if(keyVal[0].equals("START_TIME"))
+          START_TIME = Long.parseLong(keyVal[1]);
+        else if(keyVal[0].equals("FINISH_TIME"))
+          FINISH_TIME = Long.parseLong(keyVal[1]);
+      }
+      return taskID;
+    }
+
+    /**
+     * Update with non-null fields of the same task attempt log record.
+     */
+    void updateWith(TaskAttemptHistoryLog from) throws IOException {
+      if(TASK_ATTEMPT_ID == null)
+        TASK_ATTEMPT_ID = from.TASK_ATTEMPT_ID;
+      else if(! TASK_ATTEMPT_ID.equals(from.TASK_ATTEMPT_ID)) {
+        throw new IOException(
+            "Incorrect TASK_ATTEMPT_ID: " + from.TASK_ATTEMPT_ID 
+            + " expect " + TASK_ATTEMPT_ID);
+      }
+      if(from.TASK_STATUS != null)
+        TASK_STATUS = from.TASK_STATUS;
+      if(from.START_TIME > 0)
+        START_TIME = from.START_TIME;
+      if(from.FINISH_TIME > 0)
+        FINISH_TIME = from.FINISH_TIME;
+      if(from.HDFS_BYTES_READ > 0)
+        HDFS_BYTES_READ = from.HDFS_BYTES_READ;
+      if(from.HDFS_BYTES_WRITTEN > 0)
+        HDFS_BYTES_WRITTEN = from.HDFS_BYTES_WRITTEN;
+      if(from.FILE_BYTES_READ > 0)
+        FILE_BYTES_READ = from.FILE_BYTES_READ;
+      if(from.FILE_BYTES_WRITTEN > 0)
+        FILE_BYTES_WRITTEN = from.FILE_BYTES_WRITTEN;
+    }
+  }
+
+  /**
+   * Key = statName*date-time*taskType
+   * Value = number of msec for the our
+   */
+  private static class IntervalKey {
+    static final String KEY_FIELD_DELIMITER = "*";
+    String statName;
+    String dateTime;
+    String taskType;
+
+    IntervalKey(String stat, long timeMSec, String taskType) {
+      statName = stat;
+      SimpleDateFormat dateF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      dateTime = dateF.format(new Date(timeMSec));
+      this.taskType = taskType;
+    }
+
+    IntervalKey(String key) {
+      StringTokenizer keyTokens = new StringTokenizer(key, KEY_FIELD_DELIMITER);
+      if(!keyTokens.hasMoreTokens()) return;
+      statName = keyTokens.nextToken();
+      if(!keyTokens.hasMoreTokens()) return;
+      dateTime = keyTokens.nextToken();
+      if(!keyTokens.hasMoreTokens()) return;
+      taskType = keyTokens.nextToken();
+    }
+
+    void setStatName(String stat) {
+      statName = stat;
+    }
+
+    String getStringKey() {
+      return statName + KEY_FIELD_DELIMITER +
+             dateTime + KEY_FIELD_DELIMITER +
+             taskType;
+    }
+
+    Text getTextKey() {
+      return new Text(getStringKey());
+    }
+
+    public String toString() {
+      return getStringKey();
+    }
+  }
+
+  /**
+   * Mapper class.
+   */
+  private static class JHLAMapper extends IOMapperBase<Object> {
+    /**
+     * A line pattern, which delimits history logs of different jobs,
+     * if multiple job logs are written in the same file.
+     * Null value means only one job log per file is expected.
+     * The pattern should be a regular expression as in
+     * {@link String#matches(String)}.
+     */
+    String jobDelimiterPattern;
+    int maxJobDelimiterLineLength;
+    /** Count only these users jobs */
+    Collection<String> usersIncluded;
+    /** Exclude jobs of the following users */
+    Collection<String> usersExcluded;
+    /** Type of compression for compressed files: gzip */
+    Class<? extends CompressionCodec> compressionClass;
+
+    JHLAMapper() throws IOException {
+    }
+
+    JHLAMapper(Configuration conf) throws IOException {
+      configure(new JobConf(conf));
+    }
+
+    public void configure(JobConf conf) {
+      super.configure(conf );
+      usersIncluded = getUserList(conf.get("jhla.users.included", null));
+      usersExcluded = getUserList(conf.get("jhla.users.excluded", null));
+      String zipClassName = conf.get("jhla.compression.class", null);
+      try {
+        compressionClass = (zipClassName == null) ? null : 
+          Class.forName(zipClassName).asSubclass(CompressionCodec.class);
+      } catch(Exception e) {
+        throw new RuntimeException("Compression codec not found: ", e);
+      }
+      jobDelimiterPattern = conf.get("jhla.job.delimiter.pattern", null);
+      maxJobDelimiterLineLength = conf.getInt("jhla.job.delimiter.length", 512);
+    }
+
+    @Override
+    public void map(Text key, 
+                    LongWritable value,
+                    OutputCollector<Text, Text> output, 
+                    Reporter reporter) throws IOException {
+      String name = key.toString();
+      long longValue = value.get();
+      
+      reporter.setStatus("starting " + name + " ::host = " + hostName);
+      
+      long tStart = System.currentTimeMillis();
+      parseLogFile(fs, new Path(name), longValue, output, reporter);
+      long tEnd = System.currentTimeMillis();
+      long execTime = tEnd - tStart;
+      
+      reporter.setStatus("finished " + name + " ::host = " + hostName +
+          " in " + execTime/1000 + " sec.");
+    }
+
+    public Object doIO(Reporter reporter, 
+                       String path, // full path of history log file 
+                       long offset  // starting offset within the file
+                       ) throws IOException {
+      return null;
+    }
+
+    void collectStats(OutputCollector<Text, Text> output, 
+        String name,
+        long execTime,
+        Object jobObjects) throws IOException {
+    }
+
+    private boolean isEndOfJobLog(String line) {
+      if(jobDelimiterPattern == null)
+        return false;
+      return line.matches(jobDelimiterPattern);
+    }
+
+    /**
+     * Collect information about one job.
+     * 
+     * @param fs - file system
+     * @param filePath - full path of a history log file
+     * @param offset - starting offset in the history log file
+     * @throws IOException
+     */
+    public void parseLogFile(FileSystem fs,
+                                    Path filePath,
+                                    long offset,
+                                    OutputCollector<Text, Text> output,
+                                    Reporter reporter
+                                  ) throws IOException {
+      InputStream in = null;
+      try {
+        // open file & seek
+        FSDataInputStream stm = fs.open(filePath);
+        stm.seek(offset);
+        in = stm;
+        LOG.info("Opened " + filePath);
+        reporter.setStatus("Opened " + filePath);
+        // get a compression filter if specified
+        if(compressionClass != null) {
+          CompressionCodec codec = (CompressionCodec)
+            ReflectionUtils.newInstance(compressionClass, new Configuration());
+          in = codec.createInputStream(stm);
+          LOG.info("Codec created " + filePath);
+          reporter.setStatus("Codec created " + filePath);
+        }
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+        LOG.info("Reader created " + filePath);
+        // skip to the next job log start
+        long processed = 0L;
+        if(jobDelimiterPattern != null) {
+          for(String line = reader.readLine();
+                line != null; line = reader.readLine()) {
+            if((stm.getPos() - processed) > 100000) {
+              processed = stm.getPos();
+              reporter.setStatus("Processing " + filePath + " at " + processed);
+            }
+            if(isEndOfJobLog(line))
+              break;
+          }
+        }
+        // parse lines and update job history
+        JobHistoryLog jh = new JobHistoryLog();
+        int jobLineCount = 0;
+        for(String line = readLine(reader);
+              line != null; line = readLine(reader)) {
+          jobLineCount++;
+          if((stm.getPos() - processed) > 20000) {
+            processed = stm.getPos();
+            long numTasks = (jh.tasks == null ? 0 : jh.tasks.size());
+            String txt = "Processing " + filePath + " at " + processed
+                    + " # tasks = " + numTasks;
+            reporter.setStatus(txt);
+            LOG.info(txt);
+          }
+          if(isEndOfJobLog(line)) {
+            if(jh.JOBID != null) {
+              LOG.info("Finished parsing job: " + jh.JOBID
+                     + " line count = " + jobLineCount);
+              collectJobStats(jh, output, reporter);
+              LOG.info("Collected stats for job: " + jh.JOBID);
+            }
+            jh = new JobHistoryLog();
+            jobLineCount = 0;
+          } else
+            jh.parseLine(line);
+        }
+        if(jh.JOBID == null) {
+          LOG.error("JOBID = NULL in " + filePath + " at " + processed);
+          return;
+        }
+        collectJobStats(jh, output, reporter);
+      } catch(Exception ie) {
+        // parsing errors can happen if the file has been truncated
+        LOG.error("JHLAMapper.parseLogFile", ie);
+        reporter.setStatus("JHLAMapper.parseLogFile failed "
+                          + StringUtils.stringifyException(ie));
+        throw new IOException("Job failed.", ie);
+      } finally {
+        if(in != null) in.close();
+      }
+    }
+
+    /**
+     * Read lines until one ends with a " ." or "\" "
+     */
+    private StringBuffer resBuffer = new StringBuffer();
+    private String readLine(BufferedReader reader) throws IOException {
+      resBuffer.setLength(0);
+      reader.mark(maxJobDelimiterLineLength);
+      for(String line = reader.readLine();
+                line != null; line = reader.readLine()) {
+        if(isEndOfJobLog(line)) {
+          if(resBuffer.length() == 0)
+            resBuffer.append(line);
+          else
+            reader.reset();
+          break;
+        }
+        if(resBuffer.length() == 0)
+          resBuffer.append(line);
+        else if(resBuffer.length() < 32000)
+          resBuffer.append(line);
+        if(line.endsWith(" .") || line.endsWith("\" ")) {
+          break;
+        }
+        reader.mark(maxJobDelimiterLineLength);
+      }
+      String result = resBuffer.length() == 0 ? null : resBuffer.toString();
+      resBuffer.setLength(0);
+      return result;
+    }
+
+    private void collectPerIntervalStats(OutputCollector<Text, Text> output,
+        long start, long finish, String taskType,
+        StatSeries ... stats) throws IOException {
+      long curInterval = (start / DEFAULT_TIME_INTERVAL_MSEC)
+                                * DEFAULT_TIME_INTERVAL_MSEC;
+      long curTime = start;
+      long accumTime = 0;
+      while(curTime < finish) {
+        // how much of the task time belonged to current interval
+        long nextInterval = curInterval + DEFAULT_TIME_INTERVAL_MSEC;
+        long intervalTime = ((finish < nextInterval) ? 
+            finish : nextInterval) - curTime;
+        IntervalKey key = new IntervalKey("", curInterval, taskType);
+        Text val = new Text(String.valueOf(intervalTime));
+        for(StatSeries statName : stats) {
+          key.setStatName(statName.toString());
+          output.collect(key.getTextKey(), val);
+        }
+
+        curTime = curInterval = nextInterval;
+        accumTime += intervalTime;
+      }
+      // For the pending stat speculative attempts may intersect.
+      // Only one of them is considered pending.
+      assert accumTime == finish - start || finish < start;
+    }
+
+    private void collectJobStats(JobHistoryLog jh,
+                                        OutputCollector<Text, Text> output,
+                                        Reporter reporter
+                                        ) throws IOException {
+      if(jh == null)
+        return;
+      if(jh.tasks == null)
+        return;
+      if(jh.SUBMIT_TIME <= 0)
+        throw new IOException("Job " + jh.JOBID 
+                            + " SUBMIT_TIME = " + jh.SUBMIT_TIME);
+      if(usersIncluded != null && !usersIncluded.contains(jh.USER))
+          return;
+      if(usersExcluded != null && usersExcluded.contains(jh.USER))
+          return;
+
+      int numAttempts = 0;
+      long totalTime = 0;
+      boolean jobSuccess = jh.isSuccessful();
+      long jobWaitTime = jh.LAUNCH_TIME - jh.SUBMIT_TIME;
+      // attemptSubmitTime is the job's SUBMIT_TIME,
+      // or the previous attempt FINISH_TIME for all subsequent attempts
+      for(TaskHistoryLog th : jh.tasks.values()) {
+        if(th.attempts == null)
+          continue;
+        // Task is successful iff both the task and the job are a "SUCCESS"
+        long attemptSubmitTime = jh.LAUNCH_TIME;
+        boolean taskSuccess = jobSuccess && th.isSuccessful();
+        for(TaskAttemptHistoryLog tah : th.attempts.values()) {
+          // Task attempt is considered successful iff all three statuses
+          // of the attempt, the task, and the job equal "SUCCESS"
+          boolean success = taskSuccess && tah.isSuccessful();
+          if(tah.START_TIME == 0) {
+            LOG.error("Start time 0 for task attempt " + tah.TASK_ATTEMPT_ID);
+            continue;
+          }
+          if(tah.FINISH_TIME < tah.START_TIME) {
+            LOG.error("Finish time " + tah.FINISH_TIME + " is less than " +
+            		"Start time " + tah.START_TIME + " for task attempt " +
+            		tah.TASK_ATTEMPT_ID);
+            tah.FINISH_TIME = tah.START_TIME;
+          }
+
+          if(!"MAP".equals(th.TASK_TYPE) && !"REDUCE".equals(th.TASK_TYPE) &&
+             !"CLEANUP".equals(th.TASK_TYPE) && !"SETUP".equals(th.TASK_TYPE)) {
+            LOG.error("Unexpected TASK_TYPE = " + th.TASK_TYPE
+            + " for attempt " + tah.TASK_ATTEMPT_ID);
+          }
+
+          collectPerIntervalStats(output,
+                  attemptSubmitTime, tah.START_TIME, th.TASK_TYPE,
+                  StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME);
+          collectPerIntervalStats(output,
+                  attemptSubmitTime - jobWaitTime, tah.START_TIME, th.TASK_TYPE,
+                  StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME);
+          if(success)
+            collectPerIntervalStats(output,
+                  tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE,
+                  StatSeries.STAT_ALL_SLOT_TIME);
+          else
+            collectPerIntervalStats(output,
+                  tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE,
+                  StatSeries.STAT_ALL_SLOT_TIME,
+                  StatSeries.STAT_FAILED_SLOT_TIME);
+          totalTime += (tah.FINISH_TIME - tah.START_TIME);
+          numAttempts++;
+          if(numAttempts % 500 == 0) {
+            reporter.setStatus("Processing " + jh.JOBID + " at " + numAttempts);
+          }
+          attemptSubmitTime = tah.FINISH_TIME;
+        }
+      }
+      LOG.info("Total    Maps = " + jh.TOTAL_MAPS
+          + "  Reduces = " + jh.TOTAL_REDUCES);
+      LOG.info("Finished Maps = " + jh.FINISHED_MAPS
+          + "  Reduces = " + jh.FINISHED_REDUCES);
+      LOG.info("numAttempts = " + numAttempts);
+      LOG.info("totalTime   = " + totalTime);
+      LOG.info("averageAttemptTime = " 
+          + (numAttempts==0 ? 0 : totalTime/numAttempts));
+      LOG.info("jobTotalTime = " + (jh.FINISH_TIME <= jh.SUBMIT_TIME? 0 :
+                                    jh.FINISH_TIME - jh.SUBMIT_TIME));
+    }
+  }
+
+  public static class JHLAPartitioner implements Partitioner<Text, Text> {
+    static final int NUM_REDUCERS = 9;
+
+    public void configure(JobConf conf) {}
+
+    public int getPartition(Text key, Text value, int numPartitions) {
+      IntervalKey intKey = new IntervalKey(key.toString());
+      if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) {
+        if(intKey.taskType.equals("MAP"))
+          return 0;
+        else if(intKey.taskType.equals("REDUCE"))
+          return 1;
+      } else if(intKey.statName.equals(
+          StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) {
+        if(intKey.taskType.equals("MAP"))
+          return 2;
+        else if(intKey.taskType.equals("REDUCE"))
+          return 3;
+      } else if(intKey.statName.equals(
+          StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) {
+        if(intKey.taskType.equals("MAP"))
+          return 4;
+        else if(intKey.taskType.equals("REDUCE"))
+          return 5;
+      } else if(intKey.statName.equals(
+          StatSeries.STAT_FAILED_SLOT_TIME.toString())) {
+        if(intKey.taskType.equals("MAP"))
+          return 6;
+        else if(intKey.taskType.equals("REDUCE"))
+          return 7;
+      }
+      return 8;
+    }
+  }
+
+  private static void runJHLA(
+          Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
+          Path outputDir,
+          Configuration fsConfig) throws IOException {
+    JobConf job = new JobConf(fsConfig, JHLogAnalyzer.class);
+
+    job.setPartitionerClass(JHLAPartitioner.class);
+
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(mapperClass);
+    job.setReducerClass(AccumulatingReducer.class);
+
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(JHLAPartitioner.NUM_REDUCERS);
+    JobClient.runJob(job);
+  }
+
+  private static class LoggingCollector implements OutputCollector<Text, Text> {
+    public void collect(Text key, Text value) throws IOException {
+      LOG.info(key + " == " + value);
+    }
+  }
+
+  /**
+   * Run job history log analyser.
+   */
+  public static void main(String[] args) {
+    Path resFileName = RESULT_FILE;
+    Configuration conf = new Configuration();
+
+    try {
+      conf.setInt("test.io.file.buffer.size", 0);
+      Path historyDir = DEFAULT_HISTORY_DIR;
+      String testFile = null;
+      boolean cleanup = false;
+
+      boolean initControlFiles = true;
+      for (int i = 0; i < args.length; i++) {       // parse command line
+        if (args[i].equalsIgnoreCase("-historyDir")) {
+          historyDir = new Path(args[++i]);
+        } else if (args[i].equalsIgnoreCase("-resFile")) {
+          resFileName = new Path(args[++i]);
+        } else if (args[i].equalsIgnoreCase("-usersIncluded")) {
+          conf.set("jhla.users.included", args[++i]);
+        } else if (args[i].equalsIgnoreCase("-usersExcluded")) {
+          conf.set("jhla.users.excluded", args[++i]);
+        } else if (args[i].equalsIgnoreCase("-gzip")) {
+          conf.set("jhla.compression.class", GzipCodec.class.getCanonicalName());
+        } else if (args[i].equalsIgnoreCase("-jobDelimiter")) {
+          conf.set("jhla.job.delimiter.pattern", args[++i]);
+        } else if (args[i].equalsIgnoreCase("-jobDelimiterLength")) {
+          conf.setInt("jhla.job.delimiter.length", Integer.parseInt(args[++i]));
+        } else if(args[i].equalsIgnoreCase("-noInit")) {
+          initControlFiles = false;
+        } else if(args[i].equalsIgnoreCase("-test")) {
+          testFile = args[++i];
+        } else if(args[i].equalsIgnoreCase("-clean")) {
+          cleanup = true;
+        } else if(args[i].equalsIgnoreCase("-jobQueue")) {
+          conf.set("mapred.job.queue.name", args[++i]);
+        } else if(args[i].startsWith("-Xmx")) {
+          conf.set("mapred.child.java.opts", args[i]);
+        } else {
+          printUsage();
+        }
+      }
+
+      if(cleanup) {
+        cleanup(conf);
+        return;
+      }
+      if(testFile != null) {
+        LOG.info("Start JHLA test ============ ");
+        LocalFileSystem lfs = FileSystem.getLocal(conf);
+        conf.set("fs.default.name", "file:///");
+        JHLAMapper map = new JHLAMapper(conf);
+        map.parseLogFile(lfs, new Path(testFile), 0L,
+                         new LoggingCollector(), Reporter.NULL);
+        return;
+      }
+
+      FileSystem fs = FileSystem.get(conf);
+      if(initControlFiles)
+        createControlFile(fs, historyDir);
+      long tStart = System.currentTimeMillis();
+      runJHLA(JHLAMapper.class, OUTPUT_DIR, conf);
+      long execTime = System.currentTimeMillis() - tStart;
+
+      analyzeResult(fs, 0, execTime, resFileName);
+    } catch(IOException e) {
+      System.err.print(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+
+
+  private static void printUsage() {
+    String className = JHLogAnalyzer.class.getSimpleName();
+    System.err.println("Usage: " + className
+      + "\n\t[-historyDir inputDir] | [-resFile resultFile] |"
+      + "\n\t[-usersIncluded | -usersExcluded userList] |"
+      + "\n\t[-gzip] | [-jobDelimiter pattern] |"
+      + "\n\t[-help | -clean | -test testFile]");
+    System.exit(-1);
+  }
+
+  private static Collection<String> getUserList(String users) {
+    if(users == null)
+      return null;
+    StringTokenizer tokens = new StringTokenizer(users, ",;");
+    Collection<String> userList = new ArrayList<String>(tokens.countTokens());
+    while(tokens.hasMoreTokens())
+      userList.add(tokens.nextToken());
+    return userList;
+  }
+
+  /**
+   * Result is combined from all reduce output files and is written to
+   * RESULT_FILE in the format
+   * column 1: 
+   */
+  private static void analyzeResult( FileSystem fs, 
+                                     int testType,
+                                     long execTime,
+                                     Path resFileName
+                                     ) throws IOException {
+    LOG.info("Analizing results ...");
+    DataOutputStream out = null;
+    BufferedWriter writer = null;
+    try {
+      out = new DataOutputStream(fs.create(resFileName));
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      writer.write("SERIES\tPERIOD\tTYPE\tSLOT_HOUR\n");
+      FileStatus[] reduceFiles = fs.listStatus(OUTPUT_DIR);
+      assert reduceFiles.length == JHLAPartitioner.NUM_REDUCERS;
+      for(int i = 0; i < JHLAPartitioner.NUM_REDUCERS; i++) {
+        DataInputStream in = null;
+        BufferedReader lines = null;
+        try {
+          in = fs.open(reduceFiles[i].getPath());
+          lines = new BufferedReader(new InputStreamReader(in));
+    
+          String line;
+          while((line = lines.readLine()) != null) {
+            StringTokenizer tokens = new StringTokenizer(line, "\t*");
+            String attr = tokens.nextToken();
+            String dateTime = tokens.nextToken();
+            String taskType = tokens.nextToken();
+            double val = Long.parseLong(tokens.nextToken()) /
+                                    (double)DEFAULT_TIME_INTERVAL_MSEC;
+            writer.write(attr.substring(2));  // skip the stat type "l:"
+            writer.write("\t");
+            writer.write(dateTime);
+            writer.write("\t");
+            writer.write(taskType);
+            writer.write("\t");
+            writer.write(String.valueOf((float)val));
+            writer.newLine();
+          }
+        } finally {
+          if(lines != null) lines.close();
+          if(in != null) in.close();
+        }
+      }
+    } finally {
+      if(writer != null) writer.close();
+      if(out != null) out.close();
+    }
+    LOG.info("Analizing results ... done.");
+  }
+
+  private static void cleanup(Configuration conf) throws IOException {
+    LOG.info("Cleaning up test files");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(new Path(JHLA_ROOT_DIR), true);
+  }
+}

Propchange: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/JHLogAnalyzer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java Wed Jul  8 20:33:04 2009
@@ -78,7 +78,6 @@
   private static final String BASE_FILE_NAME = "test_io_";
   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
   
-  private static Configuration fsConfig = new Configuration();
   private static final long MEGA = 0x100000;
   private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
   private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
@@ -86,13 +85,18 @@
   private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
   private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
 
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
+
   /**
    * Run the test with default parameters.
    * 
    * @throws Exception
    */
   public void testIOs() throws Exception {
-    testIOs(10, 10);
+    testIOs(10, 10, new Configuration());
   }
 
   /**
@@ -102,21 +106,21 @@
    * @param nrFiles number of files
    * @throws IOException
    */
-  public static void testIOs(int fileSize, int nrFiles)
+  public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig)
     throws IOException {
 
     FileSystem fs = FileSystem.get(fsConfig);
 
-    createControlFile(fs, fileSize, nrFiles);
-    writeTest(fs);
-    readTest(fs);
+    createControlFile(fs, fileSize, nrFiles, fsConfig);
+    writeTest(fs, fsConfig);
+    readTest(fs, fsConfig);
     cleanup(fs);
   }
 
-  private static void createControlFile(
-                                        FileSystem fs,
+  private static void createControlFile(FileSystem fs,
                                         int fileSize, // in MB 
-                                        int nrFiles
+                                        int nrFiles,
+                                        Configuration fsConfig
                                         ) throws IOException {
     LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
 
@@ -158,16 +162,15 @@
    * <li>i/o rate squared</li>
    * </ul>
    */
-  private abstract static class IOStatMapper extends IOMapperBase {
+  private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
     IOStatMapper() { 
-      super(fsConfig);
     }
     
     void collectStats(OutputCollector<Text, Text> output, 
                       String name,
                       long execTime, 
-                      Object objSize) throws IOException {
-      long totalSize = ((Long)objSize).longValue();
+                      Long objSize) throws IOException {
+      long totalSize = objSize.longValue();
       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
       LOG.info("Number of bytes processed = " + totalSize);
       LOG.info("Exec time = " + execTime);
@@ -189,15 +192,14 @@
   /**
    * Write mapper class.
    */
-  public static class WriteMapper extends IOStatMapper {
+  public static class WriteMapper extends IOStatMapper<Long> {
 
     public WriteMapper() { 
-      super(); 
       for(int i=0; i < bufferSize; i++)
         buffer[i] = (byte)('0' + i % 50);
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize 
                        ) throws IOException {
@@ -219,22 +221,24 @@
       } finally {
         out.close();
       }
-      return new Long(totalSize);
+      return Long.valueOf(totalSize);
     }
   }
 
-  private static void writeTest(FileSystem fs)
-    throws IOException {
+  private static void writeTest(FileSystem fs, Configuration fsConfig)
+  throws IOException {
 
     fs.delete(DATA_DIR, true);
     fs.delete(WRITE_DIR, true);
     
-    runIOTest(WriteMapper.class, WRITE_DIR);
+    runIOTest(WriteMapper.class, WRITE_DIR, fsConfig);
   }
   
-  private static void runIOTest( Class<? extends Mapper> mapperClass, 
-                                 Path outputDir
-                                 ) throws IOException {
+  @SuppressWarnings("deprecation")
+  private static void runIOTest(
+          Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
+          Path outputDir,
+          Configuration fsConfig) throws IOException {
     JobConf job = new JobConf(fsConfig, TestDFSIO.class);
 
     FileInputFormat.setInputPaths(job, CONTROL_DIR);
@@ -253,13 +257,12 @@
   /**
    * Read mapper class.
    */
-  public static class ReadMapper extends IOStatMapper {
+  public static class ReadMapper extends IOStatMapper<Long> {
 
     public ReadMapper() { 
-      super(); 
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize 
                        ) throws IOException {
@@ -278,22 +281,22 @@
       } finally {
         in.close();
       }
-      return new Long(totalSize);
+      return Long.valueOf(totalSize);
     }
   }
 
-  private static void readTest(FileSystem fs) throws IOException {
+  private static void readTest(FileSystem fs, Configuration fsConfig)
+  throws IOException {
     fs.delete(READ_DIR, true);
-    runIOTest(ReadMapper.class, READ_DIR);
+    runIOTest(ReadMapper.class, READ_DIR, fsConfig);
   }
 
-  private static void sequentialTest(
-                                     FileSystem fs, 
+  private static void sequentialTest(FileSystem fs, 
                                      int testType, 
                                      int fileSize, 
                                      int nrFiles
                                      ) throws Exception {
-    IOStatMapper ioer = null;
+    IOStatMapper<Long> ioer = null;
     if (testType == TEST_TYPE_READ)
       ioer = new ReadMapper();
     else if (testType == TEST_TYPE_WRITE)
@@ -348,6 +351,7 @@
     LOG.info("bufferSize = " + bufferSize);
   
     try {
+      Configuration fsConfig = new Configuration();
       fsConfig.setInt("test.io.file.buffer.size", bufferSize);
       FileSystem fs = FileSystem.get(fsConfig);
 
@@ -363,12 +367,12 @@
         cleanup(fs);
         return;
       }
-      createControlFile(fs, fileSize, nrFiles);
+      createControlFile(fs, fileSize, nrFiles, fsConfig);
       long tStart = System.currentTimeMillis();
       if (testType == TEST_TYPE_WRITE)
-        writeTest(fs);
+        writeTest(fs, fsConfig);
       if (testType == TEST_TYPE_READ)
-        readTest(fs);
+        readTest(fs, fsConfig);
       long execTime = System.currentTimeMillis() - tStart;
     
       analyzeResult(fs, testType, execTime, resFileName);
@@ -388,30 +392,34 @@
       reduceFile = new Path(WRITE_DIR, "part-00000");
     else
       reduceFile = new Path(READ_DIR, "part-00000");
-    DataInputStream in;
-    in = new DataInputStream(fs.open(reduceFile));
-  
-    BufferedReader lines;
-    lines = new BufferedReader(new InputStreamReader(in));
     long tasks = 0;
     long size = 0;
     long time = 0;
     float rate = 0;
     float sqrate = 0;
-    String line;
-    while((line = lines.readLine()) != null) {
-      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
-      String attr = tokens.nextToken(); 
-      if (attr.endsWith(":tasks"))
-        tasks = Long.parseLong(tokens.nextToken());
-      else if (attr.endsWith(":size"))
-        size = Long.parseLong(tokens.nextToken());
-      else if (attr.endsWith(":time"))
-        time = Long.parseLong(tokens.nextToken());
-      else if (attr.endsWith(":rate"))
-        rate = Float.parseFloat(tokens.nextToken());
-      else if (attr.endsWith(":sqrate"))
-        sqrate = Float.parseFloat(tokens.nextToken());
+    DataInputStream in = null;
+    BufferedReader lines = null;
+    try {
+      in = new DataInputStream(fs.open(reduceFile));
+      lines = new BufferedReader(new InputStreamReader(in));
+      String line;
+      while((line = lines.readLine()) != null) {
+        StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+        String attr = tokens.nextToken(); 
+        if (attr.endsWith(":tasks"))
+          tasks = Long.parseLong(tokens.nextToken());
+        else if (attr.endsWith(":size"))
+          size = Long.parseLong(tokens.nextToken());
+        else if (attr.endsWith(":time"))
+          time = Long.parseLong(tokens.nextToken());
+        else if (attr.endsWith(":rate"))
+          rate = Float.parseFloat(tokens.nextToken());
+        else if (attr.endsWith(":sqrate"))
+          sqrate = Float.parseFloat(tokens.nextToken());
+      }
+    } finally {
+      if(in != null) in.close();
+      if(lines != null) lines.close();
     }
     
     double med = rate / 1000 / tasks;
@@ -429,12 +437,15 @@
       "    Test exec time sec: " + (float)execTime / 1000,
       "" };
 
-    PrintStream res = new PrintStream(
-                                      new FileOutputStream(
-                                                           new File(resFileName), true)); 
-    for(int i = 0; i < resultLines.length; i++) {
-      LOG.info(resultLines[i]);
-      res.println(resultLines[i]);
+    PrintStream res = null;
+    try {
+      res = new PrintStream(new FileOutputStream(new File(resFileName), true)); 
+      for(int i = 0; i < resultLines.length; i++) {
+        LOG.info(resultLines[i]);
+        res.println(resultLines[i]);
+      }
+    } finally {
+      if(res != null) res.close();
     }
   }
 

Added: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java?rev=792299&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java Wed Jul  8 20:33:04 2009
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test Job History Log Analyzer.
+ *
+ * @see JHLogAnalyzer
+ */
+public class TestJHLA extends TestCase {
+  private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class);
+  private String historyLog = System.getProperty("test.build.data", 
+                                  "build/test/data") + "/history/test.log";
+
+  @Before
+  public void setUp() throws Exception {
+    File logFile = new File(historyLog);
+    if(!logFile.getParentFile().exists())
+      if(!logFile.getParentFile().mkdirs())
+        LOG.error("Cannot create dirs for history log file: " + historyLog);
+    if(!logFile.createNewFile())
+      LOG.error("Cannot create history log file: " + historyLog);
+    BufferedWriter writer = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(historyLog)));
+    writer.write("$!!FILE=file1.log!!"); writer.newLine();
+    writer.write("Meta VERSION=\"1\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0004\" JOBNAME=\"streamjob21364.jar\" USER=\"hadoop\" SUBMIT_TIME=\"1237962008012\" JOBCONF=\"hdfs:///job_200903250600_0004/job.xml\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0004\" JOB_PRIORITY=\"NORMAL\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0004\" LAUNCH_TIME=\"1237962008712\" TOTAL_MAPS=\"2\" TOTAL_REDUCES=\"0\" JOB_STATUS=\"PREP\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000003\" TASK_TYPE=\"SETUP\" START_TIME=\"1237962008736\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0004_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000003_0\" START_TIME=\"1237962010929\" TRACKER_NAME=\"tracker_50445\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0004_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000003_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962012459\" HOSTNAME=\"host.com\" STATE_STRING=\"setup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000003\" TASK_TYPE=\"SETUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962023824\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0004\" JOB_STATUS=\"RUNNING\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1237962024049\" SPLITS=\"host1.com,host2.com,host3.com\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000001\" TASK_TYPE=\"MAP\" START_TIME=\"1237962024065\" SPLITS=\"host1.com,host2.com,host3.com\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000000_0\" START_TIME=\"1237962026157\" TRACKER_NAME=\"tracker_50524\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000000_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962041307\" HOSTNAME=\"host.com\" STATE_STRING=\"Records R/W=2681/1\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(56630)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28327)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2681)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28327)][(MAP_OUTPUT_RECORDS)(Map output records)(2681)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000000\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962054138\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(56630)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28327)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2681)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28327)][(MAP_OUTPUT_RECORDS)(Map output records)(2681)]}\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000001_0\" START_TIME=\"1237962026077\" TRACKER_NAME=\"tracker_50162\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0004_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000001_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962041030\" HOSTNAME=\"host.com\" STATE_STRING=\"Records R/W=2634/1\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(28316)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28303)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2634)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28303)][(MAP_OUTPUT_RECORDS)(Map output records)(2634)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000001\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962054187\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(28316)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(28303)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(2634)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(28303)][(MAP_OUTPUT_RECORDS)(Map output records)(2634)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000002\" TASK_TYPE=\"CLEANUP\" START_TIME=\"1237962054187\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0004_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000002_0\" START_TIME=\"1237962055578\" TRACKER_NAME=\"tracker_50162\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0004_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0004_m_000002_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962056782\" HOSTNAME=\"host.com\" STATE_STRING=\"cleanup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0004_m_000002\" TASK_TYPE=\"CLEANUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237962069193\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0004\" FINISH_TIME=\"1237962069193\" JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"2\" FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\" FAILED_REDUCES=\"0\" COUNTERS=\"{(org.apache.hadoop.mapred.JobInProgress$Counter)(Job Counters )[(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(2)]}{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_READ)(HDFS_BYTES_READ)(84946)][(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(56630)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5315)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(56630)][(MAP_OUTPUT_RECORDS)(Map output records)(5315)]}\" ."); writer.newLine();
+    writer.write("$!!FILE=file2.log!!"); writer.newLine();
+    writer.write("Meta VERSION=\"1\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0023\" JOBNAME=\"TestJob\" USER=\"hadoop2\" SUBMIT_TIME=\"1237964779799\" JOBCONF=\"hdfs:///job_200903250600_0023/job.xml\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0023\" JOB_PRIORITY=\"NORMAL\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0023\" LAUNCH_TIME=\"1237964780928\" TOTAL_MAPS=\"2\" TOTAL_REDUCES=\"0\" JOB_STATUS=\"PREP\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_r_000001\" TASK_TYPE=\"SETUP\" START_TIME=\"1237964780940\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("ReduceAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0023_r_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000001_0\" START_TIME=\"1237964720322\" TRACKER_NAME=\"tracker_3065\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("ReduceAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0023_r_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000001_0\" TASK_STATUS=\"SUCCESS\" SHUFFLE_FINISHED=\"1237964722118\" SORT_FINISHED=\"1237964722118\" FINISH_TIME=\"1237964722118\" HOSTNAME=\"host.com\" STATE_STRING=\"setup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_r_000001\" TASK_TYPE=\"SETUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964796054\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0023\" JOB_STATUS=\"RUNNING\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1237964796176\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_m_000001\" TASK_TYPE=\"MAP\" START_TIME=\"1237964796176\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000000_0\" START_TIME=\"1237964809765\" TRACKER_NAME=\"tracker_50459\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000000_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964911772\" HOSTNAME=\"host.com\" STATE_STRING=\"\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_m_000000\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964916534\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000001_0\" START_TIME=\"1237964798169\" TRACKER_NAME=\"tracker_1524\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0023_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_m_000001_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964962960\" HOSTNAME=\"host.com\" STATE_STRING=\"\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_m_000001\" TASK_TYPE=\"MAP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964976870\" COUNTERS=\"{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(500000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(5000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(5000000)][(MAP_OUTPUT_RECORDS)(Map output records)(5000000)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_r_000000\" TASK_TYPE=\"CLEANUP\" START_TIME=\"1237964976871\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("ReduceAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0023_r_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000000_0\" START_TIME=\"1237964977208\" TRACKER_NAME=\"tracker_1524\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("ReduceAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0023_r_000000\" TASK_ATTEMPT_ID=\"attempt_200903250600_0023_r_000000_0\" TASK_STATUS=\"SUCCESS\" SHUFFLE_FINISHED=\"1237964979031\" SORT_FINISHED=\"1237964979031\" FINISH_TIME=\"1237964979032\" HOSTNAME=\"host.com\" STATE_STRING=\"cleanup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0023_r_000000\" TASK_TYPE=\"CLEANUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237964991879\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0023\" FINISH_TIME=\"1237964991879\" JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"2\" FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\" FAILED_REDUCES=\"0\" COUNTERS=\"{(org.apache.hadoop.mapred.JobInProgress$Counter)(Job Counters )[(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(2)]}{(FileSystemCounters)(FileSystemCounters)[(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(1000000000)]}{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(MAP_INPUT_RECORDS)(Map input records)(10000000)][(SPILLED_RECORDS)(Spilled Records)(0)][(MAP_INPUT_BYTES)(Map input bytes)(10000000)][(MAP_OUTPUT_RECORDS)(Map output records)(10000000)]}\" ."); writer.newLine();
+    writer.write("$!!FILE=file3.log!!"); writer.newLine();
+    writer.write("Meta VERSION=\"1\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0034\" JOBNAME=\"TestJob\" USER=\"hadoop3\" SUBMIT_TIME=\"1237966370007\" JOBCONF=\"hdfs:///job_200903250600_0034/job.xml\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0034\" JOB_PRIORITY=\"NORMAL\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0034\" LAUNCH_TIME=\"1237966371076\" TOTAL_MAPS=\"2\" TOTAL_REDUCES=\"0\" JOB_STATUS=\"PREP\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0034_m_000003\" TASK_TYPE=\"SETUP\" START_TIME=\"1237966371093\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0034_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000003_0\" START_TIME=\"1237966371524\" TRACKER_NAME=\"tracker_50118\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"SETUP\" TASKID=\"task_200903250600_0034_m_000003\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000003_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237966373174\" HOSTNAME=\"host.com\" STATE_STRING=\"setup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0034_m_000003\" TASK_TYPE=\"SETUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237966386098\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0034\" JOB_STATUS=\"RUNNING\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0034_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1237966386111\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0034_m_000001\" TASK_TYPE=\"MAP\" START_TIME=\"1237966386124\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"task_200903250600_0034_m_000001\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000001_0\" TASK_STATUS=\"FAILED\" FINISH_TIME=\"1237967174546\" HOSTNAME=\"host.com\" ERROR=\"java.io.IOException: Task process exit with nonzero status of 15."); writer.newLine();
+    writer.write("  at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:424)"); writer.newLine();
+    writer.write(",java.io.IOException: Task process exit with nonzero status of 15."); writer.newLine();
+    writer.write("  at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:424)"); writer.newLine();
+    writer.write("\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0034_m_000002\" TASK_TYPE=\"CLEANUP\" START_TIME=\"1237967170815\" SPLITS=\"\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0034_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000002_0\" START_TIME=\"1237967168653\" TRACKER_NAME=\"tracker_3105\" HTTP_PORT=\"50060\" ."); writer.newLine();
+    writer.write("MapAttempt TASK_TYPE=\"CLEANUP\" TASKID=\"task_200903250600_0034_m_000002\" TASK_ATTEMPT_ID=\"attempt_200903250600_0034_m_000002_0\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237967171301\" HOSTNAME=\"host.com\" STATE_STRING=\"cleanup\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Task TASKID=\"task_200903250600_0034_m_000002\" TASK_TYPE=\"CLEANUP\" TASK_STATUS=\"SUCCESS\" FINISH_TIME=\"1237967185818\" COUNTERS=\"{(org.apache.hadoop.mapred.Task$Counter)(Map-Reduce Framework)[(SPILLED_RECORDS)(Spilled Records)(0)]}\" ."); writer.newLine();
+    writer.write("Job JOBID=\"job_200903250600_0034\" FINISH_TIME=\"1237967185818\" JOB_STATUS=\"KILLED\" FINISHED_MAPS=\"0\" FINISHED_REDUCES=\"0\" ."); writer.newLine();
+    writer.close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    File logFile = new File(historyLog);
+    if(!logFile.delete())
+      LOG.error("Cannot delete history log file: " + historyLog);
+    if(!logFile.getParentFile().delete())
+      LOG.error("Cannot delete history log dir: " + historyLog);
+  }
+
+  /**
+   * Run log analyzer in test mode for file test.log.
+   */
+  public void testJHLA() {
+    String[] args = {"-test", historyLog, "-jobDelimiter", ".!!FILE=.*!!"};
+    JHLogAnalyzer.main(args);
+    args = new String[]{"-test", historyLog, "-jobDelimiter", ".!!FILE=.*!!",
+                        "-usersIncluded", "hadoop,hadoop2"};
+    JHLogAnalyzer.main(args);
+    args = new String[]{"-test", historyLog, "-jobDelimiter", ".!!FILE=.*!!",
+        "-usersExcluded", "hadoop,hadoop3"};
+    JHLogAnalyzer.main(args);
+  }
+}

Propchange: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestJHLA.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java?rev=792299&r1=792298&r2=792299&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java Wed Jul  8 20:33:04 2009
@@ -22,11 +22,12 @@
 import org.apache.hadoop.fs.DistributedFSCheck;
 import org.apache.hadoop.fs.TestDFSIO;
 import org.apache.hadoop.fs.TestFileSystem;
+import org.apache.hadoop.fs.JHLogAnalyzer;
 import org.apache.hadoop.hdfs.NNBench;
 import org.apache.hadoop.io.FileBench;
 import org.apache.hadoop.util.ProgramDriver;
 
-/*
+/**
  * Driver for HDFS tests, which require map-reduce to run.
  */
 public class HdfsWithMRTestDriver {
@@ -55,6 +56,8 @@
           "Benchmark SequenceFile(Input|Output)Format " +
           "(block,record compressed and uncompressed), " +
           "Text(Input|Output)Format (compressed and uncompressed)");
+      pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class, 
+          "Job History Log analyzer.");
     } catch(Throwable e) {
       e.printStackTrace();
     }



Mime
View raw message