hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r911519 [2/3] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/
Date Thu, 18 Feb 2010 18:43:30 GMT
Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,686 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinished;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinished;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * {@link JobBuilder} builds one job. It processes a sequence of
+ * {@link HistoryEvent}s.
+ */
+public class JobBuilder {
+  private static final long BYTES_IN_MEG =
+      StringUtils.TraditionalBinaryPrefix.string2long("1m");
+
+  private String jobID;
+
+  private boolean finalized = false;
+
+  private LoggedJob result = new LoggedJob();
+
+  private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
+  private Map<String, LoggedTask> reduceTasks =
+      new HashMap<String, LoggedTask>();
+  private Map<String, LoggedTask> otherTasks =
+      new HashMap<String, LoggedTask>();
+
+  private Map<String, LoggedTaskAttempt> attempts =
+      new HashMap<String, LoggedTaskAttempt>();
+
+  private Map<ParsedHost, ParsedHost> allHosts =
+      new HashMap<ParsedHost, ParsedHost>();
+
+  /**
+   * The number of splits a task can have, before we ignore them all.
+   */
+  private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
+  /**
+   * The regular expression used to parse task attempt IDs in job tracker logs
+   */
+  private final static Pattern taskAttemptIDPattern =
+      Pattern.compile(".*_([0-9]+)");
+
+  private int[] attemptTimesPercentiles = null;
+
+  // Use this to search within the java options to get heap sizes.
+  // The heap size number is in Capturing Group 1.
+  // The heap size order-of-magnitude suffix is in Capturing Group 2
+  private static final Pattern heapPattern =
+      Pattern.compile("-Xmx([0-9]+[kKmMgGtT])");
+
+  public JobBuilder(String jobID) {
+    this.jobID = jobID;
+  }
+
+  public String getJobID() {
+    return jobID;
+  }
+
+  {
+    if (attemptTimesPercentiles == null) {
+      attemptTimesPercentiles = new int[19];
+
+      for (int i = 0; i < 19; ++i) {
+        attemptTimesPercentiles[i] = (i + 1) * 5;
+      }
+    }
+  }
+
+  /**
+   * Process one {@link HistoryEvent}
+   * 
+   * @param event
+   *          The {@link HistoryEvent} to be processed.
+   */
+  public void process(HistoryEvent event) {
+    if (finalized) {
+      throw new IllegalStateException(
+          "JobBuilder.process(HistoryEvent event) called after LoggedJob built");
+    }
+
+    // these are in lexicographical order by class name.
+    if (event instanceof JobFinishedEvent) {
+      processJobFinishedEvent((JobFinishedEvent) event);
+    } else if (event instanceof JobInfoChangeEvent) {
+      processJobInfoChangeEvent((JobInfoChangeEvent) event);
+    } else if (event instanceof JobInitedEvent) {
+      processJobInitedEvent((JobInitedEvent) event);
+    } else if (event instanceof JobPriorityChangeEvent) {
+      processJobPriorityChangeEvent((JobPriorityChangeEvent) event);
+    } else if (event instanceof JobStatusChangedEvent) {
+      processJobStatusChangedEvent((JobStatusChangedEvent) event);
+    } else if (event instanceof JobSubmittedEvent) {
+      processJobSubmittedEvent((JobSubmittedEvent) event);
+    } else if (event instanceof JobUnsuccessfulCompletionEvent) {
+      processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event);
+    } else if (event instanceof MapAttemptFinishedEvent) {
+      processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
+    } else if (event instanceof ReduceAttemptFinishedEvent) {
+      processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
+    } else if (event instanceof TaskAttemptFinishedEvent) {
+      processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+    } else if (event instanceof TaskAttemptStartedEvent) {
+      processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event);
+    } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+      processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+    } else if (event instanceof TaskFailedEvent) {
+      processTaskFailedEvent((TaskFailedEvent) event);
+    } else if (event instanceof TaskFinishedEvent) {
+      processTaskFinishedEvent((TaskFinishedEvent) event);
+    } else if (event instanceof TaskStartedEvent) {
+      processTaskStartedEvent((TaskStartedEvent) event);
+    } else if (event instanceof TaskUpdatedEvent) {
+      processTaskUpdatedEvent((TaskUpdatedEvent) event);
+    } else
+      throw new IllegalArgumentException(
+          "JobBuilder.process(HistoryEvent): unknown event type");
+  }
+
+  private String extract(Properties conf, String[] names, String defaultValue) {
+    for (String name : names) {
+      String result = conf.getProperty(name);
+
+      if (result != null) {
+        return result;
+      }
+    }
+
+    return defaultValue;
+  }
+
+  private Integer extractMegabytes(Properties conf, String[] names) {
+    String javaOptions = extract(conf, names, null);
+
+    if (javaOptions == null) {
+      return null;
+    }
+
+    Matcher matcher = heapPattern.matcher(javaOptions);
+
+    Integer heapMegabytes = null;
+
+    while (matcher.find()) {
+      String heapSize = matcher.group(1);
+      heapMegabytes =
+          ((int) (StringUtils.TraditionalBinaryPrefix.string2long(heapSize) / BYTES_IN_MEG));
+    }
+
+    return heapMegabytes;
+  }
+
+  private void maybeSetHeapMegabytes(Integer megabytes) {
+    if (megabytes != null) {
+      result.setHeapMegabytes(megabytes);
+    }
+  }
+
+  private void maybeSetJobMapMB(Integer megabytes) {
+    if (megabytes != null) {
+      result.setJobMapMB(megabytes);
+    }
+  }
+
+  private void maybeSetJobReduceMB(Integer megabytes) {
+    if (megabytes != null) {
+      result.setJobReduceMB(megabytes);
+    }
+  }
+
+  /**
+   * Process a collection of JobConf {@link Properties}. We do not restrict it
+   * to be called once. It is okay to process a conf before, during or after the
+   * events.
+   * 
+   * @param conf
+   *          The job conf properties to be added.
+   */
+  public void process(Properties conf) {
+    if (finalized) {
+      throw new IllegalStateException(
+          "JobBuilder.process(Properties conf) called after LoggedJob built");
+    }
+
+    result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
+        .getCandidates(), "default"));
+    result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
+        .getCandidates(), null));
+
+    maybeSetHeapMegabytes(extractMegabytes(conf,
+        JobConfPropertyNames.TASK_JAVA_OPTS_S.getCandidates()));
+    maybeSetJobMapMB(extractMegabytes(conf,
+        JobConfPropertyNames.MAP_JAVA_OPTS_S.getCandidates()));
+    maybeSetJobReduceMB(extractMegabytes(conf,
+        JobConfPropertyNames.REDUCE_JAVA_OPTS_S.getCandidates()));
+  }
+
+  /**
+   * Request the builder to build the final object. Once called, the
+   * {@link JobBuilder} would accept no more events or job-conf properties.
+   * 
+   * @return Parsed {@link LoggedJob} object.
+   */
+  public LoggedJob build() {
+    // The main job here is to build CDFs
+    finalized = true;
+
+    // initialize all the per-job statistics gathering places
+    Histogram[] successfulMapAttemptTimes =
+        new Histogram[ParsedHost.numberOfDistances() + 1];
+    for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
+      successfulMapAttemptTimes[i] = new Histogram();
+    }
+
+    Histogram successfulReduceAttemptTimes = new Histogram();
+    Histogram[] failedMapAttemptTimes =
+        new Histogram[ParsedHost.numberOfDistances() + 1];
+    for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
+      failedMapAttemptTimes[i] = new Histogram();
+    }
+    Histogram failedReduceAttemptTimes = new Histogram();
+
+    Histogram successfulNthMapperAttempts = new Histogram();
+    // Histogram successfulNthReducerAttempts = new Histogram();
+    // Histogram mapperLocality = new Histogram();
+
+    for (LoggedTask task : result.getMapTasks()) {
+      for (LoggedTaskAttempt attempt : task.getAttempts()) {
+        int distance = successfulMapAttemptTimes.length - 1;
+        Long runtime = null;
+
+        if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
+          runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+          if (attempt.getResult() == Values.SUCCESS) {
+            LoggedLocation host = attempt.getLocation();
+
+            List<LoggedLocation> locs = task.getPreferredLocations();
+
+            if (host != null && locs != null) {
+              for (LoggedLocation loc : locs) {
+                ParsedHost preferedLoc = new ParsedHost(loc);
+
+                distance =
+                    Math.min(distance, preferedLoc
+                        .distance(new ParsedHost(host)));
+              }
+
+              // mapperLocality.enter(distance);
+            }
+
+            if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
+              if (runtime != null) {
+                successfulMapAttemptTimes[distance].enter(runtime);
+              }
+            }
+
+            String attemptID = attempt.getAttemptID();
+
+            if (attemptID != null) {
+              Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
+
+              if (matcher.matches()) {
+                String attemptNumberString = matcher.group(1);
+
+                if (attemptNumberString != null) {
+                  int attemptNumber = Integer.parseInt(attemptNumberString);
+
+                  successfulNthMapperAttempts.enter(attemptNumber);
+                }
+              }
+            }
+          } else {
+            if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
+              if (runtime != null) {
+                failedMapAttemptTimes[distance].enter(runtime);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    for (LoggedTask task : result.getReduceTasks()) {
+      for (LoggedTaskAttempt attempt : task.getAttempts()) {
+        Long runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+        if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
+          runtime = attempt.getFinishTime() - attempt.getStartTime();
+        }
+        if (attempt.getResult() == Values.SUCCESS) {
+          if (runtime != null) {
+            successfulReduceAttemptTimes.enter(runtime);
+          }
+        } else if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
+          failedReduceAttemptTimes.enter(runtime);
+        }
+      }
+    }
+
+    result.setFailedMapAttemptCDFs(mapCDFArrayList(failedMapAttemptTimes));
+
+    LoggedDiscreteCDF failedReduce = new LoggedDiscreteCDF();
+    failedReduce.setCDF(failedReduceAttemptTimes, attemptTimesPercentiles, 100);
+    result.setFailedReduceAttemptCDF(failedReduce);
+
+    result
+        .setSuccessfulMapAttemptCDFs(mapCDFArrayList(successfulMapAttemptTimes));
+
+    LoggedDiscreteCDF succReduce = new LoggedDiscreteCDF();
+    succReduce.setCDF(successfulReduceAttemptTimes, attemptTimesPercentiles,
+        100);
+    result.setSuccessfulReduceAttemptCDF(succReduce);
+
+    result.setFailedMapAttemptCDFs(null);
+
+    long totalSuccessfulAttempts = 0L;
+    long maxTriesToSucceed = 0L;
+
+    for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+      totalSuccessfulAttempts += ent.getValue();
+      maxTriesToSucceed = Math.max(maxTriesToSucceed, ent.getKey());
+    }
+
+    if (totalSuccessfulAttempts > 0L) {
+      double[] successAfterI = new double[(int) maxTriesToSucceed + 1];
+      for (int i = 0; i < successAfterI.length; ++i) {
+        successAfterI[i] = 0.0D;
+      }
+
+      for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+        successAfterI[ent.getKey().intValue()] =
+            ((double) ent.getValue()) / totalSuccessfulAttempts;
+      }
+      result.setMapperTriesToSucceed(successAfterI);
+    } else {
+      result.setMapperTriesToSucceed(null);
+    }
+
+    return result;
+  }
+
+  private ArrayList<LoggedDiscreteCDF> mapCDFArrayList(Histogram[] data) {
+    ArrayList<LoggedDiscreteCDF> result = new ArrayList<LoggedDiscreteCDF>();
+
+    for (Histogram hist : data) {
+      LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF();
+      discCDF.setCDF(hist, attemptTimesPercentiles, 100);
+      result.add(discCDF);
+    }
+
+    return result;
+  }
+
+  private static Values getPre21Value(String name) {
+    if (name.equalsIgnoreCase("JOB_CLEANUP")) {
+      return Values.CLEANUP;
+    }
+    if (name.equalsIgnoreCase("JOB_SETUP")) {
+      return Values.SETUP;
+    }
+
+    return Values.valueOf(name.toUpperCase());
+  }
+
+  private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
+    LoggedTask task = getTask(event.getTaskId().toString());
+    if (task == null) {
+      return;
+    }
+    task.setFinishTime(event.getFinishTime());
+  }
+
+  private void processTaskStartedEvent(TaskStartedEvent event) {
+    LoggedTask task =
+        getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
+    task.setStartTime(event.getStartTime());
+    task.setPreferredLocations(preferredLocationForSplits(event
+        .getSplitLocations()));
+  }
+
+  private void processTaskFinishedEvent(TaskFinishedEvent event) {
+    LoggedTask task =
+        getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
+    if (task == null) {
+      return;
+    }
+    task.setFinishTime(event.getFinishTime());
+    task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+    task.incorporateCounters(((TaskFinished) event.getDatum()).counters);
+  }
+
+  private void processTaskFailedEvent(TaskFailedEvent event) {
+    LoggedTask task =
+        getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
+    if (task == null) {
+      return;
+    }
+    task.setFinishTime(event.getFinishTime());
+    task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+  }
+
+  private void processTaskAttemptUnsuccessfulCompletionEvent(
+      TaskAttemptUnsuccessfulCompletionEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getTaskAttemptId().toString());
+
+    if (attempt == null) {
+      return;
+    }
+
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname());
+
+    if (parsedHost != null) {
+      attempt.setLocation(parsedHost.makeLoggedLocation());
+    }
+
+    attempt.setFinishTime(event.getFinishTime());
+  }
+
+  private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getTaskAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setStartTime(event.getStartTime());
+  }
+
+  private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    attempt.setLocation(getAndRecordParsedHost(event.getHostname())
+        .makeLoggedLocation());
+    attempt.setFinishTime(event.getFinishTime());
+    attempt
+        .incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters);
+  }
+
+  private void processReduceAttemptFinishedEvent(
+      ReduceAttemptFinishedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    attempt.setHostName(event.getHostname());
+    // XXX There may be redundant location info available in the event.
+    // We might consider extracting it from this event. Currently this
+    // is redundant, but making this will add future-proofing.
+    attempt.setFinishTime(event.getFinishTime());
+    attempt.setShuffleFinished(event.getShuffleFinishTime());
+    attempt.setSortFinished(event.getSortFinishTime());
+    attempt
+        .incorporateCounters(((ReduceAttemptFinished) event.getDatum()).counters);
+  }
+
+  private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    attempt.setHostName(event.getHostname());
+    // XXX There may be redundant location info available in the event.
+    // We might consider extracting it from this event. Currently this
+    // is redundant, but making this will add future-proofing.
+    attempt.setFinishTime(event.getFinishTime());
+    attempt
+        .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+  }
+
+  private void processJobUnsuccessfulCompletionEvent(
+      JobUnsuccessfulCompletionEvent event) {
+    result.setOutcome(Pre21JobHistoryConstants.Values
+        .valueOf(event.getStatus()));
+    result.setFinishTime(event.getFinishTime());
+  }
+
+  private void processJobSubmittedEvent(JobSubmittedEvent event) {
+    result.setJobID(event.getJobId().toString());
+    result.setJobName(event.getJobName());
+    result.setUser(event.getUserName());
+    result.setSubmitTime(event.getSubmitTime());
+  }
+
+  private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
+    result.setOutcome(Pre21JobHistoryConstants.Values
+        .valueOf(event.getStatus()));
+  }
+
+  private void processJobPriorityChangeEvent(JobPriorityChangeEvent event) {
+    result.setPriority(LoggedJob.JobPriority.valueOf(event.getPriority()
+        .toString()));
+  }
+
+  private void processJobInitedEvent(JobInitedEvent event) {
+    result.setLaunchTime(event.getLaunchTime());
+    result.setTotalMaps(event.getTotalMaps());
+    result.setTotalReduces(event.getTotalReduces());
+  }
+
+  private void processJobInfoChangeEvent(JobInfoChangeEvent event) {
+    result.setLaunchTime(event.getLaunchTime());
+  }
+
+  private void processJobFinishedEvent(JobFinishedEvent event) {
+    result.setFinishTime(event.getFinishTime());
+    result.setJobID(jobID);
+    result.setOutcome(Values.SUCCESS);
+  }
+
+  private LoggedTask getTask(String taskIDname) {
+    LoggedTask result = mapTasks.get(taskIDname);
+
+    if (result != null) {
+      return result;
+    }
+
+    result = reduceTasks.get(taskIDname);
+
+    if (result != null) {
+      return result;
+    }
+
+    return otherTasks.get(taskIDname);
+  }
+
+  /**
+   * @param type
+   *          the task type
+   * @param taskIDname
+   *          the task ID name, as a string
+   * @param allowCreate
+   *          if true, we can create a task.
+   * @return
+   */
+  private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
+      boolean allowCreate) {
+    Map<String, LoggedTask> taskMap = otherTasks;
+    List<LoggedTask> tasks = this.result.getOtherTasks();
+
+    switch (type) {
+    case MAP:
+      taskMap = mapTasks;
+      tasks = this.result.getMapTasks();
+      break;
+
+    case REDUCE:
+      taskMap = reduceTasks;
+      tasks = this.result.getReduceTasks();
+      break;
+
+    default:
+      // no code
+    }
+
+    LoggedTask result = taskMap.get(taskIDname);
+
+    if (result == null && allowCreate) {
+      result = new LoggedTask();
+      result.setTaskType(getPre21Value(type.toString()));
+      result.setTaskID(taskIDname);
+      taskMap.put(taskIDname, result);
+      tasks.add(result);
+    }
+
+    return result;
+  }
+
+  private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
+      String taskIDName, String taskAttemptName) {
+    LoggedTask task = getOrMakeTask(type, taskIDName, false);
+    LoggedTaskAttempt result = attempts.get(taskAttemptName);
+
+    if (result == null && task != null) {
+      result = new LoggedTaskAttempt();
+      result.setAttemptID(taskAttemptName);
+      attempts.put(taskAttemptName, result);
+      task.getAttempts().add(result);
+    }
+
+    return result;
+  }
+
+  private ParsedHost getAndRecordParsedHost(String hostName) {
+    ParsedHost result = ParsedHost.parse(hostName);
+
+    if (result != null) {
+      ParsedHost canonicalResult = allHosts.get(result);
+
+      if (canonicalResult != null) {
+        return canonicalResult;
+      }
+
+      allHosts.put(result, result);
+
+      return result;
+    }
+
+    return null;
+  }
+
+  private ArrayList<LoggedLocation> preferredLocationForSplits(String splits) {
+    if (splits != null) {
+      ArrayList<LoggedLocation> locations = null;
+
+      StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+      if (tok.countTokens() <= MAXIMUM_PREFERRED_LOCATIONS) {
+        locations = new ArrayList<LoggedLocation>();
+
+        while (tok.hasMoreTokens()) {
+          String nextSplit = tok.nextToken();
+
+          ParsedHost node = getAndRecordParsedHost(nextSplit);
+
+          if (locations != null && node != null) {
+            locations.add(node.makeLoggedLocation());
+          }
+        }
+
+        return locations;
+      }
+    }
+
+    return null;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.JobContext;
+
+public enum JobConfPropertyNames {
+  QUEUE_NAMES("mapred.job.queue.name", JobContext.QUEUE_NAME), JOB_NAMES(
+      JobContext.JOB_NAME), TASK_JAVA_OPTS_S("mapred.child.java.opts"),
+  MAP_JAVA_OPTS_S("mapred.child.java.opts", JobContext.MAP_JAVA_OPTS),
+  REDUCE_JAVA_OPTS_S("mapred.child.java.opts", JobContext.REDUCE_JAVA_OPTS);
+
+  private String[] candidates;
+
+  JobConfPropertyNames(String... candidates) {
+    this.candidates = candidates;
+  }
+
+  public String[] getCandidates() {
+    return candidates;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * {@link JobConfigurationParser} parses the job configuration xml file, and
+ * extracts various framework specific properties. It parses the file using a
+ * stream-parser and thus is more memory efficient. [This optimization may be
+ * postponed for a future release]
+ */
+public class JobConfigurationParser {
+  final private Set<String> interested;
+
+  /**
+   * Constructor
+   * 
+   * @param interested
+   *          properties we should extract from the job configuration xml.
+   */
+  public JobConfigurationParser(List<String> interested) {
+    this.interested = new HashSet<String>(interested);
+  }
+
+  /**
+   * Parse the job configuration file (as an input stream) and return a
+   * {@link Properties} collection. The input stream will not be closed after
+   * return from the call.
+   * 
+   * @param input
+   *          The input data.
+   * @return A {@link Properties} collection extracted from the job
+   *         configuration xml.
+   * @throws IOException
+   */
+  Properties parse(InputStream input) throws IOException {
+    Properties result = new Properties();
+
+    try {
+      DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+      DocumentBuilder db = dbf.newDocumentBuilder();
+
+      Document doc = db.parse(input);
+
+      Element root = doc.getDocumentElement();
+
+      if (!"configuration".equals(root.getTagName())) {
+        System.out.print("root is not a configuration node");
+        return null;
+      }
+
+      NodeList props = root.getChildNodes();
+
+      for (int i = 0; i < props.getLength(); ++i) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element))
+          continue;
+        Element prop = (Element) propNode;
+        if (!"property".equals(prop.getTagName())) {
+          System.out.print("bad conf file: element not <property>");
+        }
+        NodeList fields = prop.getChildNodes();
+        String attr = null;
+        String value = null;
+        @SuppressWarnings("unused")
+        boolean finalParameter = false;
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element)) {
+            continue;
+          }
+
+          Element field = (Element) fieldNode;
+          if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+            attr = ((Text) field.getFirstChild()).getData().trim();
+          }
+          if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+            value = ((Text) field.getFirstChild()).getData();
+          }
+          if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+            finalParameter =
+                "true".equals(((Text) field.getFirstChild()).getData());
+          }
+        }
+
+        if (interested.contains(attr) && value != null) {
+          result.put(attr, value);
+        }
+      }
+    } catch (ParserConfigurationException e) {
+      return null;
+    } catch (SAXException e) {
+      return null;
+    }
+
+    return result;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+
+/**
+ * {@link JobHistoryParser} defines the interface of a Job History file parser.
+ */
+public interface JobHistoryParser extends Closeable {
+  /**
+   * Get the next {@link HistoryEvent}
+   * @return the next {@link HistoryEvent}. If no more events left, return null.
+   * @throws IOException 
+   */
+  HistoryEvent nextEvent() throws IOException;
+  
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link JobHistoryParserFactory} is a singleton class that attempts to
+ * determine the version of job history and return a proper parser.
+ */
+public class JobHistoryParserFactory {
+  public static JobHistoryParser getParser(RewindableInputStream ris)
+      throws IOException {
+    for (VersionDetector vd : VersionDetector.values()) {
+      boolean canParse = vd.canParse(ris);
+      ris.rewind();
+      if (canParse) {
+        return vd.newInstance(ris);
+      }
+    }
+
+    throw new IOException("No suitable parser.");
+  }
+
+  enum VersionDetector {
+    Hadoop20() {
+
+      @Override
+      public boolean canParse(InputStream input) throws IOException {
+        return Hadoop20JHParser.canParse(input);
+      }
+
+      @Override
+      public JobHistoryParser newInstance(InputStream input) throws IOException {
+        return new Hadoop20JHParser(input);
+      }
+    },
+
+    Current() {
+
+      @Override
+      public boolean canParse(InputStream input) throws IOException {
+        return CurrentJHParser.canParse(input);
+      }
+
+      @Override
+      public JobHistoryParser newInstance(InputStream input) throws IOException {
+        return new CurrentJHParser(input);
+      }
+    };
+
+    abstract JobHistoryParser newInstance(InputStream input) throws IOException;
+
+    abstract boolean canParse(InputStream input) throws IOException;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Thu Feb 18 18:43:28 2010
@@ -23,13 +23,7 @@
 import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -44,7 +38,6 @@
   private final ObjectMapper mapper;
   private final Class<? extends T> clazz;
   private final JsonParser jsonParser;
-  private final Decompressor decompressor;
 
   /**
    * Constructor.
@@ -60,17 +53,7 @@
     mapper.configure(
         DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     this.clazz = clazz;
-    FileSystem fs = path.getFileSystem(conf);
-    CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
-    InputStream input;
-    if (codec == null) {
-      input = fs.open(path);
-      decompressor = null;
-    } else {
-      FSDataInputStream fsdis = fs.open(path);
-      decompressor = CodecPool.getDecompressor(codec);
-      input = codec.createInputStream(fsdis, decompressor);
-    }
+    InputStream input = new PossiblyDecompressedInputStream(path, conf);
     jsonParser = mapper.getJsonFactory().createJsonParser(input);
   }
 
@@ -86,7 +69,6 @@
     mapper.configure(
         DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     this.clazz = clazz;
-    decompressor = null;
     jsonParser = mapper.getJsonFactory().createJsonParser(input);
   }
 
@@ -107,12 +89,6 @@
 
   @Override
   public void close() throws IOException {
-    try {
-      jsonParser.close();
-    } finally {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
+    jsonParser.close();
   }
 }

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
+ * @param <T> The type of the objects to be written.
+ */
+public class JsonObjectMapperWriter<T> implements Closeable {
+  private JsonGenerator writer;
+  
+  public JsonObjectMapperWriter(OutputStream output, boolean prettyPrint) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(
+        SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    mapper.getJsonFactory();
+    writer = mapper.getJsonFactory().createJsonGenerator(
+        output, JsonEncoding.UTF8);
+    if (prettyPrint) {
+      writer.useDefaultPrettyPrinter();
+    }
+  }
+  
+  public void write(T object) throws IOException {
+    writer.writeObject(object);
+  }
+  
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Thu Feb 18 18:43:28 2010
@@ -82,7 +82,7 @@
    * @param level
    *          the level number
    */
-  LoggedNetworkTopology(HashSet<ParsedHost> hosts, String name, int level) {
+  LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
 
     this.name = name;
     this.children = null;
@@ -119,6 +119,10 @@
     }
   }
 
+  LoggedNetworkTopology(Set<ParsedHost> hosts) {
+    this(hosts, "<root>", 0);
+  }
+
   public String getName() {
     return name;
   }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Thu Feb 18 18:43:28 2010
@@ -23,6 +23,11 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.hadoop.mapreduce.jobhistory.Events;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
+
 import org.codehaus.jackson.annotate.JsonAnySetter;
 
 /**
@@ -47,9 +52,6 @@
   List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
   List<LoggedLocation> preferredLocations = Collections.emptyList();
 
-  int numberMaps = -1;
-  int numberReduces = -1;
-
   static private Set<String> alreadySeenAnySetterAttributes =
       new TreeSet<String>();
 
@@ -157,22 +159,6 @@
     }
   }
 
-  public int getNumberMaps() {
-    return numberMaps;
-  }
-
-  void setNumberMaps(int numberMaps) {
-    this.numberMaps = numberMaps;
-  }
-
-  public int getNumberReduces() {
-    return numberReduces;
-  }
-
-  void setNumberReduces(int numberReduces) {
-    this.numberReduces = numberReduces;
-  }
-
   public Pre21JobHistoryConstants.Values getTaskStatus() {
     return taskStatus;
   }
@@ -189,6 +175,110 @@
     this.taskType = taskType;
   }
 
+  private void incorporateMapCounters(JhCounters counters) {
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputBytes = val;
+      }
+    }, counters, "HDFS_BYTES_READ");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputBytes = val;
+      }
+    }, counters, "FILE_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputRecords = val;
+      }
+    }, counters, "MAP_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputRecords = val;
+      }
+    }, counters, "MAP_OUTPUT_RECORDS");
+  }
+
+  private void incorporateReduceCounters(JhCounters counters) {
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputBytes = val;
+      }
+    }, counters, "REDUCE_SHUFFLE_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputBytes = val;
+      }
+    }, counters, "HDFS_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputRecords = val;
+      }
+    }, counters, "REDUCE_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputRecords = val;
+      }
+    }, counters, "REDUCE_OUTPUT_RECORDS");
+  }
+
+  // incorporate event counters
+  // LoggedTask MUST KNOW ITS TYPE BEFORE THIS CALL
+  public void incorporateCounters(JhCounters counters) {
+    switch (taskType) {
+    case MAP:
+      incorporateMapCounters(counters);
+      return;
+    case REDUCE:
+      incorporateReduceCounters(counters);
+      return;
+      // NOT exhaustive
+    }
+  }
+
+  private static String canonicalizeCounterName(String nonCanonicalName) {
+    String result = nonCanonicalName.toLowerCase();
+
+    result = result.replace(' ', '|');
+    result = result.replace('-', '|');
+    result = result.replace('_', '|');
+    result = result.replace('.', '|');
+
+    return result;
+  }
+
+  private abstract class SetField {
+    LoggedTask task;
+
+    SetField(LoggedTask task) {
+      this.task = task;
+    }
+
+    abstract void set(long value);
+  }
+
+  private static void incorporateCounter(SetField thunk, JhCounters counters,
+      String counterName) {
+    counterName = canonicalizeCounterName(counterName);
+
+    for (JhCounterGroup group : counters.groups) {
+      for (JhCounter counter : group.counts) {
+        if (counterName
+            .equals(canonicalizeCounterName(counter.name.toString()))) {
+          thunk.set(counter.value);
+          return;
+        }
+      }
+    }
+  }
+
   private void compare1(long c1, long c2, TreePath loc, String eltname)
       throws DeepInequalityException {
     if (c1 != c2) {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Thu Feb 18 18:43:28 2010
@@ -28,6 +28,11 @@
 //                the Jackson implementation of JSON doesn't handle a 
 //                superclass-valued field.
 
+import org.apache.hadoop.mapreduce.jobhistory.Events;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
+
 /**
  * A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a
  * hadoop job. Note that a task can have several attempts.
@@ -140,7 +145,7 @@
   }
 
   void setHostName(String hostName) {
-    this.hostName = hostName.intern();
+    this.hostName = hostName == null ? null : hostName.intern();
   }
 
   public long getHdfsBytesRead() {
@@ -263,6 +268,130 @@
     this.mapInputBytes = mapInputBytes;
   }
 
+  // incorporate event counters
+  public void incorporateCounters(JhCounters counters) {
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.hdfsBytesRead = val;
+      }
+    }, counters, "HDFS_BYTES_READ");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.hdfsBytesWritten = val;
+      }
+    }, counters, "HDFS_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.fileBytesRead = val;
+      }
+    }, counters, "FILE_BYTES_READ");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.fileBytesWritten = val;
+      }
+    }, counters, "FILE_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapInputBytes = val;
+      }
+    }, counters, "MAP_INPUT_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapInputRecords = val;
+      }
+    }, counters, "MAP_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapOutputBytes = val;
+      }
+    }, counters, "MAP_OUTPUT_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapOutputRecords = val;
+      }
+    }, counters, "MAP_OUTPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.combineInputRecords = val;
+      }
+    }, counters, "COMBINE_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceInputGroups = val;
+      }
+    }, counters, "REDUCE_INPUT_GROUPS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceInputRecords = val;
+      }
+    }, counters, "REDUCE_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceShuffleBytes = val;
+      }
+    }, counters, "REDUCE_SHUFFLE_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceOutputRecords = val;
+      }
+    }, counters, "REDUCE_OUTPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.spilledRecords = val;
+      }
+    }, counters, "SPILLED_RECORDS");
+  }
+
+  private static String canonicalizeCounterName(String nonCanonicalName) {
+    String result = nonCanonicalName.toLowerCase();
+
+    result = result.replace(' ', '|');
+    result = result.replace('-', '|');
+    result = result.replace('_', '|');
+    result = result.replace('.', '|');
+
+    return result;
+  }
+
+  private abstract class SetField {
+    LoggedTaskAttempt attempt;
+
+    SetField(LoggedTaskAttempt attempt) {
+      this.attempt = attempt;
+    }
+
+    abstract void set(long value);
+  }
+
+  private static void incorporateCounter(SetField thunk, JhCounters counters,
+      String counterName) {
+    counterName = canonicalizeCounterName(counterName);
+
+    for (JhCounterGroup group : counters.groups) {
+      for (JhCounter counter : group.counts) {
+        if (counterName
+            .equals(canonicalizeCounterName(counter.name.toString()))) {
+          thunk.set(counter.value);
+          return;
+        }
+      }
+    }
+  }
+
   private void compare1(String c1, String c2, TreePath loc, String eltname)
       throws DeepInequalityException {
     if (c1 == null && c2 == null) {

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+
+public class MapAttempt20LineHistoryEventEmitter extends
+    TaskAttempt20LineEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  static {
+    nonFinals.addAll(taskEventNonFinalSEEs);
+
+    finals.add(new MapAttemptFinishedEventEmitter());
+  }
+
+  protected MapAttempt20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class MapAttemptFinishedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && status.equalsIgnoreCase("success")) {
+
+        try {
+          String hostName = line.get("HOSTNAME");
+          String counters = line.get("COUNTERS");
+          String state = line.get("STATE_STRING");
+
+          MapAttempt20LineHistoryEventEmitter that =
+              (MapAttempt20LineHistoryEventEmitter) thatg;
+
+          if (finishTime != null && "success".equalsIgnoreCase(status)) {
+            return new MapAttemptFinishedEvent(taskAttemptID,
+                that.originalTaskType, status, Long.parseLong(finishTime), Long
+                    .parseLong(finishTime), hostName, state,
+                parseCounters(counters));
+          }
+        } catch (ParseException e) {
+          // no code
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Interface to output a sequence of objects of type T.
+ */
+public interface Outputter<T> extends Closeable {
+  /**
+   * Initialize the {@link Outputter} to a specific path.
+   * @param path The {@link Path} to the output file.
+   * @param conf Configuration
+   * @throws IOException
+   */
+  public void init(Path path, Configuration conf) throws IOException;
+  
+  /**
+   * Output an object.
+   * @param object The objecte.
+   * @throws IOException
+   */
+  public void output(T object) throws IOException;
+
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Thu Feb 18 18:43:28 2010
@@ -39,8 +39,10 @@
 import org.xml.sax.SAXException;
 
 class ParsedConfigFile {
-  private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
-  private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+  private static final Pattern jobIDPattern =
+      Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+  private static final Pattern heapPattern =
+      Pattern.compile("-Xmx([0-9]+)([mMgG])");
 
   final int heapMegabytes;
 
@@ -70,6 +72,7 @@
   }
 
   @SuppressWarnings("hiding")
+  @Deprecated
   ParsedConfigFile(String filenameLine, String xmlString) {
     super();
 
@@ -138,10 +141,11 @@
             value = ((Text) field.getFirstChild()).getData();
           }
           if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
-            finalParameter = "true".equals(((Text) field.getFirstChild())
-                .getData());
+            finalParameter =
+                "true".equals(((Text) field.getFirstChild()).getData());
           }
         }
+
         if ("mapred.child.java.opts".equals(attr) && value != null) {
           Matcher matcher = heapPattern.matcher(value);
           if (matcher.find()) {
@@ -163,14 +167,16 @@
           jobName = value;
         }
 
-        clusterMapMB = maybeGetIntValue(MRConfig.MAPMEMORY_MB, attr,
-            value, clusterMapMB);
-        clusterReduceMB = maybeGetIntValue(MRConfig.REDUCEMEMORY_MB,
-            attr, value, clusterReduceMB);
-        jobMapMB = maybeGetIntValue(JobContext.MAP_MEMORY_MB, attr, value,
-            jobMapMB);
-        jobReduceMB = maybeGetIntValue(JobContext.REDUCE_MEMORY_MB, attr,
-            value, jobReduceMB);
+        clusterMapMB =
+            maybeGetIntValue(MRConfig.MAPMEMORY_MB, attr, value, clusterMapMB);
+        clusterReduceMB =
+            maybeGetIntValue(MRConfig.REDUCEMEMORY_MB, attr, value,
+                clusterReduceMB);
+        jobMapMB =
+            maybeGetIntValue(JobContext.MAP_MEMORY_MB, attr, value, jobMapMB);
+        jobReduceMB =
+            maybeGetIntValue(JobContext.REDUCE_MEMORY_MB, attr, value,
+                jobReduceMB);
       }
 
       valid = true;

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=911519&r1=911518&r2=911519&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Thu Feb 18 18:43:28 2010
@@ -24,10 +24,10 @@
   Properties content;
   LogRecordType type;
 
-  static final Pattern keyValPair = Pattern
-      .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+  static final Pattern keyValPair =
+      Pattern.compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
 
-  @SuppressWarnings("unused") 
+  @SuppressWarnings("unused")
   ParsedLine(String fullLine, int version) {
     super();
 
@@ -47,8 +47,16 @@
 
     String propValPairs = fullLine.substring(firstSpace + 1);
 
-    while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
-      propValPairs = propValPairs.substring(1);
+    int pvPairsFirstNonBlank = 0;
+    int pvPairsLength = propValPairs.length();
+
+    while (pvPairsLength > pvPairsFirstNonBlank
+        && propValPairs.charAt(pvPairsFirstNonBlank) == ' ') {
+      ++pvPairsFirstNonBlank;
+    }
+
+    if (pvPairsFirstNonBlank != 0) {
+      propValPairs = propValPairs.substring(pvPairsFirstNonBlank);
     }
 
     int cursor = 0;

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+
+class PossiblyDecompressedInputStream extends InputStream {
+  private final Decompressor decompressor;
+  private final InputStream coreInputStream;
+
+  public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
+      throws IOException {
+    CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
+    CompressionCodec inputCodec = codecs.getCodec(inputPath);
+
+    FileSystem ifs = inputPath.getFileSystem(conf);
+    FSDataInputStream fileIn = ifs.open(inputPath);
+
+    if (inputCodec == null) {
+      decompressor = null;
+      coreInputStream = fileIn;
+    } else {
+      decompressor = CodecPool.getDecompressor(inputCodec);
+      coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
+    }
+  }
+
+  @Override
+  public int read() throws IOException {
+    return coreInputStream.read();
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    return coreInputStream.read(buffer, offset, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (decompressor != null) {
+      CodecPool.returnDecompressor(decompressor);
+    }
+
+    coreInputStream.close();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+
+public class ReduceAttempt20LineHistoryEventEmitter extends
+    TaskAttempt20LineEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  static {
+    nonFinals.addAll(taskEventNonFinalSEEs);
+
+    finals.add(new ReduceAttemptFinishedEventEmitter());
+  }
+
+  ReduceAttempt20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class ReduceAttemptFinishedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && status.equalsIgnoreCase("success")) {
+
+        try {
+          String hostName = line.get("HOSTNAME");
+          String counters = line.get("COUNTERS");
+          String state = line.get("STATE_STRING");
+          String shuffleFinish = line.get("SHUFFLE_FINISHED");
+          String sortFinish = line.get("SORT_FINISHED");
+
+          if (finishTime != null && shuffleFinish != null && sortFinish != null
+              && "success".equalsIgnoreCase(status)) {
+            ReduceAttempt20LineHistoryEventEmitter that =
+                (ReduceAttempt20LineHistoryEventEmitter) thatg;
+
+            return new ReduceAttemptFinishedEvent(taskAttemptID,
+                that.originalTaskType, status, Long.parseLong(shuffleFinish),
+                Long.parseLong(sortFinish), Long.parseLong(finishTime),
+                hostName, state, parseCounters(counters));
+          }
+        } catch (ParseException e) {
+          // no code
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A simple wrapper class to make any input stream "rewindable". It could be
+ * made more memory efficient by grow the internal buffer adaptively.
+ */
+public class RewindableInputStream extends InputStream {
+  private InputStream input;
+
+  /**
+   * Constructor.
+   * 
+   * @param input
+   */
+  public RewindableInputStream(InputStream input) {
+    this(input, 1024 * 1024);
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param input
+   *          input stream.
+   * @param maxBytesToRemember
+   *          Maximum number of bytes we need to remember at the beginning of
+   *          the stream. If {@link #rewind()} is called after so many bytes are
+   *          read from the stream, {@link #rewind()} would fail.
+   */
+  public RewindableInputStream(InputStream input, int maxBytesToRemember) {
+    this.input = new BufferedInputStream(input, maxBytesToRemember);
+    this.input.mark(maxBytesToRemember);
+  }
+
+  @Override
+  public int read() throws IOException {
+    return input.read();
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    return input.read(buffer, offset, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    input.close();
+  }
+
+  public InputStream rewind() throws IOException {
+    try {
+      input.reset();
+      return this;
+    } catch (IOException e) {
+      throw new IOException("Unable to rewind the stream", e);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+
+abstract class SingleEventEmitter {
+  abstract HistoryEvent maybeEmitEvent(ParsedLine line, String name,
+      HistoryEventEmitter that);
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
+
+public class Task20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  Long originalStartTime = null;
+  TaskType originalTaskType = null;
+
+  static {
+    nonFinals.add(new TaskStartedEventEmitter());
+    nonFinals.add(new TaskUpdatedEventEmitter());
+
+    finals.add(new TaskFinishedEventEmitter());
+    finals.add(new TaskFailedEventEmitter());
+  }
+
+  protected Task20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class TaskStartedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String taskType = line.get("TASK_TYPE");
+      String startTime = line.get("START_TIME");
+      String splits = line.get("SPLITS");
+
+      if (startTime != null && taskType != null) {
+        Task20LineHistoryEventEmitter that =
+            (Task20LineHistoryEventEmitter) thatg;
+
+        that.originalStartTime = Long.parseLong(startTime);
+        that.originalTaskType =
+            Version20LogInterfaceUtils.get20TaskType(taskType);
+
+        return new TaskStartedEvent(taskID, that.originalStartTime,
+            that.originalTaskType, splits);
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskUpdatedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+
+      if (finishTime != null) {
+        return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskFinishedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String status = line.get("TASK_STATUS");
+      String finishTime = line.get("FINISH_TIME");
+
+      String error = line.get("ERROR");
+
+      String counters = line.get("COUNTERS");
+
+      if (finishTime != null && error == null
+          && (status != null && status.equalsIgnoreCase("success"))) {
+        Counters eventCounters = null;
+
+        try {
+          eventCounters = parseCounters(counters);
+        } catch (ParseException e) {
+          // no code
+        }
+
+        Task20LineHistoryEventEmitter that =
+            (Task20LineHistoryEventEmitter) thatg;
+
+        if (that.originalTaskType == null) {
+          return null;
+        }
+
+        return new TaskFinishedEvent(taskID, Long.parseLong(finishTime),
+            that.originalTaskType, status, eventCounters);
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskFailedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String status = line.get("TASK_STATUS");
+      String finishTime = line.get("FINISH_TIME");
+
+      String taskType = line.get("TASK_TYPE");
+
+      String error = line.get("ERROR");
+
+      if (finishTime != null
+          && (error != null || (status != null && !status
+              .equalsIgnoreCase("success")))) {
+        Task20LineHistoryEventEmitter that =
+            (Task20LineHistoryEventEmitter) thatg;
+
+        TaskType originalTaskType =
+            that.originalTaskType == null ? Version20LogInterfaceUtils
+                .get20TaskType(taskType) : that.originalTaskType;
+
+        return new TaskFailedEvent(taskID, Long.parseLong(finishTime),
+            originalTaskType, error, status, null);
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=911519&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Thu Feb 18 18:43:28 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+
+public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter {
+  static List<SingleEventEmitter> taskEventNonFinalSEEs =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> taskEventFinalSEEs =
+      new LinkedList<SingleEventEmitter>();
+
+  static private final int DEFAULT_HTTP_PORT = 80;
+
+  Long originalStartTime = null;
+  org.apache.hadoop.mapreduce.TaskType originalTaskType = null;
+
+  static {
+    taskEventNonFinalSEEs.add(new TaskAttemptStartedEventEmitter());
+    taskEventNonFinalSEEs.add(new TaskAttemptFinishedEventEmitter());
+    taskEventNonFinalSEEs
+        .add(new TaskAttemptUnsuccessfulCompletionEventEmitter());
+  }
+
+  protected TaskAttempt20LineEventEmitter() {
+    super();
+  }
+
+  static private class TaskAttemptStartedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String startTime = line.get("START_TIME");
+      String taskType = line.get("TASK_TYPE");
+      String trackerName = line.get("TRACKER_NAME");
+      String httpPort = line.get("HTTP_PORT");
+
+      if (startTime != null && taskType != null) {
+        TaskAttempt20LineEventEmitter that =
+            (TaskAttempt20LineEventEmitter) thatg;
+
+        that.originalStartTime = Long.parseLong(startTime);
+        that.originalTaskType =
+            Version20LogInterfaceUtils.get20TaskType(taskType);
+
+        int port =
+            httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
+                .parseInt(httpPort);
+
+        return new TaskAttemptStartedEvent(taskAttemptID,
+            that.originalTaskType, that.originalStartTime, trackerName, port);
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskAttemptFinishedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && status.equalsIgnoreCase("success")) {
+
+        try {
+          String hostName = line.get("HOSTNAME");
+          String counters = line.get("COUNTERS");
+          String state = line.get("STATE_STRING");
+
+          TaskAttempt20LineEventEmitter that =
+              (TaskAttempt20LineEventEmitter) thatg;
+
+          return new TaskAttemptFinishedEvent(taskAttemptID,
+              that.originalTaskType, status, Long.parseLong(finishTime),
+              hostName, state, parseCounters(counters));
+        } catch (ParseException e) {
+          return null;
+        }
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskAttemptUnsuccessfulCompletionEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && !status.equalsIgnoreCase("success")) {
+        String hostName = line.get("HOSTNAME");
+        String error = line.get("ERROR");
+
+        TaskAttempt20LineEventEmitter that =
+            (TaskAttempt20LineEventEmitter) thatg;
+
+        return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
+            that.originalTaskType, status, Long.parseLong(finishTime),
+            hostName, error);
+      }
+
+      return null;
+    }
+  }
+}



Mime
View raw message