hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r816052 [2/5] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/con...
Date Thu, 17 Sep 2009 05:04:27 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader.CounterFields;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader.GroupFields;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * Event Writer is an utility class used to write events to the underlying
+ * stream. Typically, one event writer (which translates to one stream) 
+ * is created per job 
+ * 
+ */
+class EventWriter {
+
+  static final JsonFactory FACTORY = new JsonFactory();
+  private JsonGenerator gen; 
+  
+  EventWriter(FSDataOutputStream out) throws IOException {
+    gen = FACTORY.createJsonGenerator(out, JsonEncoding.UTF8);
+    // Prefix all log files with the version
+    writeVersionInfo();
+  }
+  
+  private void writeVersionInfo() throws IOException {
+    gen.writeStartObject();
+    gen.writeStringField("HISTORY_VERSION", JobHistory.HISTORY_VERSION);
+    gen.writeEndObject();
+    gen.writeRaw("\n");
+  }
+  
+  synchronized void write(HistoryEvent event)
+  throws IOException { 
+    writeEventType(gen, event.getEventType());
+    event.writeFields(gen);
+    gen.writeRaw("\n");
+  }
+  
+  void flush() throws IOException { 
+    gen.flush();
+  }
+
+  void close() throws IOException {
+    gen.close();
+  }
+  
+  /**
+   * Write the event type to the JsonGenerator
+   * @param gen
+   * @param type
+   * @throws IOException
+   */
+  private void writeEventType(JsonGenerator gen, EventType type) 
+  throws IOException {
+    gen.writeStartObject();
+    gen.writeStringField("EVENT_TYPE", type.toString());
+    gen.writeEndObject();
+  }  
+  
+  static void writeCounters(Counters counters, JsonGenerator gen)
+  throws IOException {
+    gen.writeFieldName("COUNTERS");
+    gen.writeStartArray(); // Start of all groups
+    Iterator<CounterGroup> groupItr = counters.iterator();
+    while (groupItr.hasNext()) {
+      writeOneGroup(gen, groupItr.next());
+    }
+    gen.writeEndArray(); // End of all groups
+  }
+  
+  static void writeOneGroup (JsonGenerator gen, CounterGroup grp)
+  throws IOException {
+    gen.writeStartObject(); // Start of this group
+    gen.writeStringField(GroupFields.ID.toString(), grp.getName());
+    gen.writeStringField(GroupFields.NAME.toString(), grp.getDisplayName());
+  
+    // Write out the List of counters
+    gen.writeFieldName(GroupFields.LIST.toString());
+    gen.writeStartArray(); // Start array of counters
+    Iterator<Counter> ctrItr = grp.iterator();
+    while (ctrItr.hasNext()) {
+      writeOneCounter(gen, ctrItr.next());
+    }
+    gen.writeEndArray(); // End of all counters
+
+    gen.writeEndObject(); // End of this group
+  }
+
+  static void writeOneCounter(JsonGenerator gen, Counter ctr)
+  throws IOException{
+    gen.writeStartObject();
+    gen.writeStringField(CounterFields.ID.toString(), ctr.getName());
+    gen.writeStringField(CounterFields.NAME.toString(), ctr.getDisplayName());
+    gen.writeNumberField("VALUE", ctr.getValue());
+    gen.writeEndObject();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+
+/**
+ * The interface all job history events implement
+ *
+ */
+public interface HistoryEvent {
+
+  // The category that history event belongs to
+  enum EventCategory {
+    JOB, TASK, TASK_ATTEMPT
+  }
+  
+  /**
+   * Serialize the Fields of the event to the JsonGenerator
+   * @param gen JsonGenerator to write to
+   * @throws IOException
+   */
+  void writeFields (JsonGenerator gen) throws IOException;
+  
+  /**
+   * Deserialize the fields of the event from the JsonParser
+   * @param parser JsonParser to read from
+   * @throws IOException
+   */
+  void readFields(JsonParser parser) throws IOException;
+  
+  /** Return the event type */
+  EventType getEventType();
+  
+  /** Retun the event category */
+  EventCategory getEventCategory();
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobInProgress;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.TaskLogServlet;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * HistoryViewer is used to parse and view the JobHistory files 
+ *
+ */
+public class HistoryViewer {
+  private static SimpleDateFormat dateFormat = 
+    new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
+  private FileSystem fs;
+  private JobInfo job;
+  private String jobId;
+  private boolean printAll;
+
+/**
+ * Constructs the HistoryViewer object
+ * @param historyFile The fully qualified Path of the History File
+ * @param conf The Configuration file
+ * @param printAll Toggle to print all status to only killed/failed status
+ * @throws IOException
+ */
+  public HistoryViewer(String historyFile, 
+                       Configuration conf,
+                       boolean printAll) throws IOException {
+    this.printAll = printAll;
+    String errorMsg = "Unable to initialize History Viewer";
+    try {
+      Path jobFile = new Path(historyFile);
+      fs = jobFile.getFileSystem(conf);
+      String[] jobDetails =
+        jobFile.getName().split("_");
+      if (jobDetails.length < 2) {
+        // NOT a valid name
+        System.err.println("Ignore unrecognized file: " + jobFile.getName());
+        throw new IOException(errorMsg);
+      }
+      JobHistoryParser parser = new JobHistoryParser(fs, jobFile);
+      job = parser.parse();
+      jobId = job.getJobId().toString();
+    } catch(Exception e) {
+      throw new IOException(errorMsg, e);
+    }
+  }
+
+  /**
+   * Print the job/task/attempt summary information
+   * @throws IOException
+   */
+  public void print() throws IOException{
+    printJobDetails();
+    printTaskSummary();
+    printJobAnalysis();
+    printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
+    printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
+    printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
+    printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
+    printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
+    printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
+    printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
+    printTasks(TaskType.JOB_CLEANUP, 
+        JobStatus.getJobRunState(JobStatus.KILLED));
+    if (printAll) {
+      printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
+      printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
+      printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
+      printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
+      printAllTaskAttempts(TaskType.JOB_SETUP);
+      printAllTaskAttempts(TaskType.MAP);
+      printAllTaskAttempts(TaskType.REDUCE);
+      printAllTaskAttempts(TaskType.JOB_CLEANUP);
+    }
+    
+    FilteredJob filter = new FilteredJob(job, 
+        TaskStatus.State.FAILED.toString());
+    printFailedAttempts(filter);
+    
+    filter = new FilteredJob(job,
+        TaskStatus.State.KILLED.toString());
+    printFailedAttempts(filter);
+  }
+ 
+  private void printJobDetails() {
+    StringBuffer jobDetails = new StringBuffer();
+    jobDetails.append("\nHadoop job: " ).append(job.getJobId());
+    jobDetails.append("\n=====================================");
+    jobDetails.append("\nUser: ").append(job.getUsername()); 
+    jobDetails.append("\nJobName: ").append(job.getJobname()); 
+    jobDetails.append("\nJobConf: ").append(job.getJobConfPath()); 
+    jobDetails.append("\nSubmitted At: ").append(StringUtils.
+                        getFormattedTimeWithDiff(dateFormat,
+                        job.getSubmitTime(), 0)); 
+    jobDetails.append("\nLaunched At: ").append(StringUtils.
+                        getFormattedTimeWithDiff(dateFormat,
+                        job.getLaunchTime(),
+                        job.getSubmitTime()));
+    jobDetails.append("\nFinished At: ").append(StringUtils.
+                        getFormattedTimeWithDiff(dateFormat,
+                        job.getFinishTime(),
+                        job.getLaunchTime()));
+    jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ? 
+                      "Incomplete" :job.getJobStatus()));
+    jobDetails.append("\n=====================================");
+    System.out.println(jobDetails.toString());
+  }
+  
+  private void printAllTaskAttempts(TaskType taskType) {
+    Map<TaskID, TaskInfo> tasks = job.getAllTasks();
+    StringBuffer taskList = new StringBuffer();
+    taskList.append("\n").append(taskType);
+    taskList.append(" task list for ").append(job.getJobId());
+    taskList.append("\nTaskId\t\tStartTime");
+    if (TaskType.REDUCE.equals(taskType)) {
+      taskList.append("\tShuffleFinished\tSortFinished");
+    }
+    taskList.append("\tFinishTime\tHostName\tError\tTaskLogs");
+    taskList.append("\n====================================================");
+    System.out.println(taskList.toString());
+    for (JobHistoryParser.TaskInfo task : tasks.values()) {
+      for (JobHistoryParser.TaskAttemptInfo attempt : 
+        task.getAllTaskAttempts().values()) {
+        if (taskType.equals(task.getTaskType())){
+          taskList.setLength(0); 
+          taskList.append(attempt.getAttemptId()).append("\t");
+          taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+                          attempt.getStartTime(), 0)).append("\t");
+          if (TaskType.REDUCE.equals(taskType)) {
+            taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+                            attempt.getShuffleFinishTime(),
+                            attempt.getStartTime()));
+            taskList.append("\t"); 
+            taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, 
+                            attempt.getSortFinishTime(),
+                            attempt.getShuffleFinishTime())); 
+          } 
+          taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+                          attempt.getFinishTime(),
+                          attempt.getStartTime())); 
+          taskList.append("\t"); 
+          taskList.append(attempt.getHostname()).append("\t");
+          taskList.append(attempt.getError());
+          String taskLogsUrl = getTaskLogsUrl(attempt);
+          taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a");
+          System.out.println(taskList.toString());
+        }
+      }
+    }
+  }
+
+  private void printTaskSummary() {
+    SummarizedJob ts = new SummarizedJob(job);
+    StringBuffer taskSummary = new StringBuffer();
+    taskSummary.append("\nTask Summary");
+    taskSummary.append("\n============================");
+    taskSummary.append("\nKind\tTotal\t");
+    taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
+    taskSummary.append("\n");
+    taskSummary.append("\nSetup\t").append(ts.totalSetups);
+    taskSummary.append("\t").append(ts.numFinishedSetups);
+    taskSummary.append("\t\t").append(ts.numFailedSetups);
+    taskSummary.append("\t").append(ts.numKilledSetups);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, ts.setupStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, ts.setupFinished, ts.setupStarted));
+    taskSummary.append("\nMap\t").append(ts.totalMaps);
+    taskSummary.append("\t").append(job.getFinishedMaps());
+    taskSummary.append("\t\t").append(ts.numFailedMaps);
+    taskSummary.append("\t").append(ts.numKilledMaps);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, ts.mapStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, ts.mapFinished, ts.mapStarted));
+    taskSummary.append("\nReduce\t").append(ts.totalReduces);
+    taskSummary.append("\t").append(job.getFinishedReduces());
+    taskSummary.append("\t\t").append(ts.numFailedReduces);
+    taskSummary.append("\t").append(ts.numKilledReduces);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, ts.reduceStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                            dateFormat, ts.reduceFinished, ts.reduceStarted));
+    taskSummary.append("\nCleanup\t").append(ts.totalCleanups);
+    taskSummary.append("\t").append(ts.numFinishedCleanups);
+    taskSummary.append("\t\t").append(ts.numFailedCleanups);
+    taskSummary.append("\t").append(ts.numKilledCleanups);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                            dateFormat, ts.cleanupStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                            dateFormat, ts.cleanupFinished, 
+                            ts.cleanupStarted)); 
+    taskSummary.append("\n============================\n");
+    System.out.println(taskSummary.toString());
+  }
+
+  private void printJobAnalysis() {
+    if (!job.getJobStatus().equals
+        (JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
+      System.out.println("No Analysis available as job did not finish");
+      return;
+    }
+    
+    AnalyzedJob avg = new AnalyzedJob(job);
+    
+    System.out.println("\nAnalysis");
+    System.out.println("=========");
+    printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10);
+    printLast(avg.getMapTasks(), "map", cFinishMapRed);
+
+    if (avg.getReduceTasks().length > 0) {
+      printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle", 
+          avg.getAvgShuffleTime(), 10);
+      printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle);
+
+      printAnalysis(avg.getReduceTasks(), cReduce, "reduce",
+          avg.getAvgReduceTime(), 10);
+      printLast(avg.getReduceTasks(), "reduce", cFinishMapRed);
+    }
+    System.out.println("=========");
+  }
+
+  private void printAnalysis(JobHistoryParser.TaskAttemptInfo [] tasks,
+      Comparator<JobHistoryParser.TaskAttemptInfo> cmp,
+      String taskType,
+      long avg,
+      int showTasks) {
+    Arrays.sort(tasks, cmp);
+    JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1];
+    StringBuffer details = new StringBuffer();
+    details.append("\nTime taken by best performing ");
+    details.append(taskType).append(" task ");
+    details.append(min.getAttemptId().getTaskID().toString()).append(": ");
+    if ("map".equals(taskType)) {
+      details.append(StringUtils.formatTimeDiff(
+          min.getFinishTime(),
+          min.getStartTime()));
+    } else if ("shuffle".equals(taskType)) {
+      details.append(StringUtils.formatTimeDiff(
+          min.getShuffleFinishTime(),
+          min.getStartTime()));
+    } else {
+      details.append(StringUtils.formatTimeDiff(
+          min.getFinishTime(),
+          min.getShuffleFinishTime()));
+    }
+    details.append("\nAverage time taken by ");
+    details.append(taskType).append(" tasks: "); 
+    details.append(StringUtils.formatTimeDiff(avg, 0));
+    details.append("\nWorse performing ");
+    details.append(taskType).append(" tasks: ");
+    details.append("\nTaskId\t\tTimetaken");
+    System.out.println(details.toString());
+    for (int i = 0; i < showTasks && i < tasks.length; i++) {
+      details.setLength(0);
+      details.append(tasks[i].getAttemptId().getTaskID()).append(" ");
+      if ("map".equals(taskType)) {
+        details.append(StringUtils.formatTimeDiff(
+            tasks[i].getFinishTime(),
+            tasks[i].getStartTime()));
+      } else if ("shuffle".equals(taskType)) {
+        details.append(StringUtils.formatTimeDiff(
+            tasks[i].getShuffleFinishTime(),
+            tasks[i].getStartTime()));
+      } else {
+        details.append(StringUtils.formatTimeDiff(
+            tasks[i].getFinishTime(),
+            tasks[i].getShuffleFinishTime()));
+      }
+      System.out.println(details.toString());
+    }
+  }
+
+  private void printLast(JobHistoryParser.TaskAttemptInfo [] tasks,
+      String taskType,
+      Comparator<JobHistoryParser.TaskAttemptInfo> cmp
+  ) {
+    Arrays.sort(tasks, cFinishMapRed);
+    JobHistoryParser.TaskAttemptInfo last = tasks[0];
+    StringBuffer lastBuf = new StringBuffer();
+    lastBuf.append("The last ").append(taskType);
+    lastBuf.append(" task ").append(last.getAttemptId().getTaskID());
+    Long finishTime;
+    if ("shuffle".equals(taskType)) {
+      finishTime = last.getShuffleFinishTime();
+    } else {
+      finishTime = last.getFinishTime();
+    }
+    lastBuf.append(" finished at (relative to the Job launch time): ");
+    lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat,
+        finishTime, job.getLaunchTime()));
+    System.out.println(lastBuf.toString());
+  }
+
+  private void printTasks(TaskType taskType, String status) {
+    Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
+    StringBuffer header = new StringBuffer();
+    header.append("\n").append(status).append(" ");
+    header.append(taskType).append(" task list for ").append(jobId);
+    header.append("\nTaskId\t\tStartTime\tFinishTime\tError");
+    if (TaskType.MAP.equals(taskType)) {
+      header.append("\tInputSplits");
+    }
+    header.append("\n====================================================");
+    StringBuffer taskList = new StringBuffer();
+    for (JobHistoryParser.TaskInfo task : tasks.values()) {
+      if (taskType.equals(task.getTaskType()) &&
+         (status.equals(task.getTaskStatus())
+          || status.equalsIgnoreCase("ALL"))) {
+        taskList.setLength(0);
+        taskList.append(task.getTaskId());
+        taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                   dateFormat, task.getStartTime(), 0));
+        taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                   dateFormat, task.getFinishTime(),
+                   task.getStartTime())); 
+        taskList.append("\t").append(task.getError());
+        if (TaskType.MAP.equals(taskType)) {
+          taskList.append("\t").append(task.getSplitLocations());
+        }
+        if (taskList != null) {
+          System.out.println(header.toString());
+          System.out.println(taskList.toString());
+        }
+      }
+    }
+  }
+
+  private void printFailedAttempts(FilteredJob filteredJob) {
+      Map<String, Set<TaskID>> badNodes = filteredJob.getFilteredMap();
+      StringBuffer attempts = new StringBuffer();
+      if (badNodes.size() > 0) {
+        attempts.append("\n").append(filteredJob.getFilter());
+        attempts.append(" task attempts by nodes");
+        attempts.append("\nHostname\tFailedTasks");
+        attempts.append("\n===============================");
+        System.out.println(attempts.toString());
+        for (Map.Entry<String, 
+            Set<TaskID>> entry : badNodes.entrySet()) {
+          String node = entry.getKey();
+          Set<TaskID> failedTasks = entry.getValue();
+          attempts.setLength(0);
+          attempts.append(node).append("\t");
+          for (TaskID t : failedTasks) {
+            attempts.append(t).append(", ");
+          }
+          System.out.println(attempts.toString());
+        }
+      }
+  }
+  
+  /**
+   * Return the TaskLogsUrl of a particular TaskAttempt
+   * 
+   * @param attempt
+   * @return the taskLogsUrl. null if http-port or tracker-name or
+   *         task-attempt-id are unavailable.
+   */
+  public static String getTaskLogsUrl(
+      JobHistoryParser.TaskAttemptInfo attempt) {
+    if (attempt.getHttpPort() == -1
+        || attempt.getTrackerName().equals("")
+        || attempt.getAttemptId() == null) {
+      return null;
+    }
+  
+    String taskTrackerName =
+      JobInProgress.convertTrackerNameToHostName(
+        attempt.getTrackerName());
+    return TaskLogServlet.getTaskLogUrl(taskTrackerName,
+        Integer.toString(attempt.getHttpPort()),
+        attempt.getAttemptId().toString());
+  }
+
+  private Comparator<JobHistoryParser.TaskAttemptInfo> cMap = 
+    new Comparator<JobHistoryParser.TaskAttemptInfo>() {
+    public int compare(JobHistoryParser.TaskAttemptInfo t1, 
+        JobHistoryParser.TaskAttemptInfo t2) {
+      long l1 = t1.getFinishTime() - t1.getStartTime();
+      long l2 = t2.getFinishTime() - t2.getStartTime();
+      return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
+    }
+  };
+
+  private Comparator<JobHistoryParser.TaskAttemptInfo> cShuffle = 
+    new Comparator<JobHistoryParser.TaskAttemptInfo>() {
+    public int compare(JobHistoryParser.TaskAttemptInfo t1, 
+        JobHistoryParser.TaskAttemptInfo t2) {
+      long l1 = t1.getShuffleFinishTime() - t1.getStartTime();
+      long l2 = t2.getShuffleFinishTime() - t2.getStartTime();
+      return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
+    }
+  };
+
+  private Comparator<JobHistoryParser.TaskAttemptInfo> cFinishShuffle = 
+    new Comparator<JobHistoryParser.TaskAttemptInfo>() {
+    public int compare(JobHistoryParser.TaskAttemptInfo t1, 
+        JobHistoryParser.TaskAttemptInfo t2) {
+      long l1 = t1.getShuffleFinishTime(); 
+      long l2 = t2.getShuffleFinishTime();
+      return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
+    }
+  };
+
+  private Comparator<JobHistoryParser.TaskAttemptInfo> cFinishMapRed = 
+    new Comparator<JobHistoryParser.TaskAttemptInfo>() {
+    public int compare(JobHistoryParser.TaskAttemptInfo t1, 
+        JobHistoryParser.TaskAttemptInfo t2) {
+      long l1 = t1.getFinishTime(); 
+      long l2 = t2.getFinishTime();
+      return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
+    }
+  };
+  
+  private Comparator<JobHistoryParser.TaskAttemptInfo> cReduce = 
+    new Comparator<JobHistoryParser.TaskAttemptInfo>() {
+    public int compare(JobHistoryParser.TaskAttemptInfo t1, 
+        JobHistoryParser.TaskAttemptInfo t2) {
+      long l1 = t1.getFinishTime() -
+                t1.getShuffleFinishTime();
+      long l2 = t2.getFinishTime() -
+                t2.getShuffleFinishTime();
+      return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1));
+    }
+  }; 
+
+  /**
+   * Utility class used the summarize the job. 
+   * Used by HistoryViewer and the JobHistory UI.
+   *
+   */
+  public static class SummarizedJob {
+    Map<TaskID, JobHistoryParser.TaskInfo> tasks; 
+     int totalMaps = 0; 
+     int totalReduces = 0; 
+     int totalCleanups = 0;
+     int totalSetups = 0;
+     int numFailedMaps = 0; 
+     int numKilledMaps = 0;
+     int numFailedReduces = 0; 
+     int numKilledReduces = 0;
+     int numFinishedCleanups = 0;
+     int numFailedCleanups = 0;
+     int numKilledCleanups = 0;
+     int numFinishedSetups = 0;
+     int numFailedSetups = 0;
+     int numKilledSetups = 0;
+     long mapStarted = 0; 
+     long mapFinished = 0; 
+     long reduceStarted = 0; 
+     long reduceFinished = 0; 
+     long cleanupStarted = 0;
+     long cleanupFinished = 0;
+     long setupStarted = 0;
+     long setupFinished = 0;
+     
+     /** Get total maps */
+     public int getTotalMaps() { return totalMaps; } 
+     /** Get total reduces */
+     public int getTotalReduces() { return totalReduces; } 
+     /** Get number of clean up tasks */ 
+     public int getTotalCleanups() { return totalCleanups; }
+     /** Get number of set up tasks */
+     public int getTotalSetups() { return totalSetups; }
+     /** Get number of failed maps */
+     public int getNumFailedMaps() { return numFailedMaps; }
+     /** Get number of killed maps */
+     public int getNumKilledMaps() { return numKilledMaps; }
+     /** Get number of failed reduces */
+     public int getNumFailedReduces() { return numFailedReduces; } 
+     /** Get number of killed reduces */
+     public int getNumKilledReduces() { return numKilledReduces; }
+     /** Get number of cleanup tasks that finished */
+     public int getNumFinishedCleanups() { return numFinishedCleanups; }
+     /** Get number of failed cleanup tasks */
+     public int getNumFailedCleanups() { return numFailedCleanups; }
+     /** Get number of killed cleanup tasks */
+     public int getNumKilledCleanups() { return numKilledCleanups; }
+     /** Get number of finished set up tasks */
+     public int getNumFinishedSetups() { return numFinishedSetups; }
+     /** Get number of failed set up tasks */
+     public int getNumFailedSetups() { return numFailedSetups; }
+     /** Get number of killed set up tasks */
+     public int getNumKilledSetups() { return numKilledSetups; }
+     /** Get number of maps that were started */
+     public long getMapStarted() { return mapStarted; } 
+     /** Get number of maps that finished */
+     public long getMapFinished() { return mapFinished; } 
+     /** Get number of Reducers that were started */
+     public long getReduceStarted() { return reduceStarted; } 
+     /** Get number of reducers that finished */
+     public long getReduceFinished() { return reduceFinished; } 
+     /** Get number of cleanup tasks started */ 
+     public long getCleanupStarted() { return cleanupStarted; }
+     /** Get number of cleanup tasks that finished */
+     public long getCleanupFinished() { return cleanupFinished; }
+     /** Get number of setup tasks that started */
+     public long getSetupStarted() { return setupStarted; }
+     /** Get number of setup tasks that finished */
+     public long getSetupFinished() { return setupFinished; }
+
+     /** Create summary information for the parsed job */
+    public SummarizedJob(JobInfo job) {
+      tasks = job.getAllTasks();
+
+      for (JobHistoryParser.TaskInfo task : tasks.values()) {
+        Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts = 
+          task.getAllTaskAttempts();
+        //allHosts.put(task.getHo(Keys.HOSTNAME), "");
+        for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
+          long startTime = attempt.getStartTime(); 
+          long finishTime = attempt.getFinishTime();
+          if (attempt.getTaskType().equals(TaskType.MAP)) {
+            if (mapStarted== 0 || mapStarted > startTime) {
+              mapStarted = startTime; 
+            }
+            if (mapFinished < finishTime) {
+              mapFinished = finishTime; 
+            }
+            totalMaps++; 
+            if (attempt.getTaskStatus().equals
+                (TaskStatus.State.FAILED.toString())) {
+              numFailedMaps++; 
+            } else if (attempt.getTaskStatus().equals
+                (TaskStatus.State.KILLED.toString())) {
+              numKilledMaps++;
+            }
+          } else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
+            if (reduceStarted==0||reduceStarted > startTime) {
+              reduceStarted = startTime; 
+            }
+            if (reduceFinished < finishTime) {
+              reduceFinished = finishTime; 
+            }
+            totalReduces++; 
+            if (attempt.getTaskStatus().equals
+                (TaskStatus.State.FAILED.toString())) {
+              numFailedReduces++; 
+            } else if (attempt.getTaskStatus().equals
+                (TaskStatus.State.KILLED.toString())) {
+              numKilledReduces++;
+            }
+          } else if (attempt.getTaskType().equals(TaskType.JOB_CLEANUP)) {
+            if (cleanupStarted==0||cleanupStarted > startTime) {
+              cleanupStarted = startTime; 
+            }
+            if (cleanupFinished < finishTime) {
+              cleanupFinished = finishTime; 
+            }
+            totalCleanups++; 
+            if (attempt.getTaskStatus().equals
+                (TaskStatus.State.SUCCEEDED.toString())) {
+              numFinishedCleanups++; 
+            } else if (attempt.getTaskStatus().equals
+                (TaskStatus.State.FAILED.toString())) {
+              numFailedCleanups++;
+            } else if (attempt.getTaskStatus().equals
+                (TaskStatus.State.KILLED.toString())) {
+              numKilledCleanups++;
+            }
+          } else if (attempt.getTaskType().equals(TaskType.JOB_SETUP)) {
+            if (setupStarted==0||setupStarted > startTime) {
+              setupStarted = startTime; 
+            }
+            if (setupFinished < finishTime) {
+              setupFinished = finishTime; 
+            }
+            totalSetups++; 
+            if (attempt.getTaskStatus().equals
+                (TaskStatus.State.SUCCEEDED.toString())) {
+              numFinishedSetups++;
+            } else if (attempt.getTaskStatus().equals
+                (TaskStatus.State.FAILED.toString())) {
+              numFailedSetups++;
+            } else if (attempt.getTaskStatus().equals
+                (TaskStatus.State.KILLED.toString())) {
+              numKilledSetups++;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Utility class used while analyzing the job. 
+   * Used by HistoryViewer and the JobHistory UI.
+   */
+ 
+  public static class AnalyzedJob {
+    private long avgMapTime;
+    private long avgReduceTime;
+    private long avgShuffleTime;
+    
+    private JobHistoryParser.TaskAttemptInfo [] mapTasks;
+    private JobHistoryParser.TaskAttemptInfo [] reduceTasks;
+
+    /** Get the average map time */
+    public long getAvgMapTime() { return avgMapTime; }
+    /** Get the average reduce time */
+    public long getAvgReduceTime() { return avgReduceTime; }
+    /** Get the average shuffle time */
+    public long getAvgShuffleTime() { return avgShuffleTime; }
+    /** Get the map tasks list */
+    public JobHistoryParser.TaskAttemptInfo [] getMapTasks() { 
+      return mapTasks;
+    }
+    /** Get the reduce tasks list */
+    public JobHistoryParser.TaskAttemptInfo [] getReduceTasks() { 
+      return reduceTasks;
+    }
+    /** Generate analysis information for the parsed job */
+    public AnalyzedJob (JobInfo job) {
+      Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
+      int finishedMaps = (int) job.getFinishedMaps();
+      int finishedReduces = (int) job.getFinishedReduces();
+      mapTasks = 
+        new JobHistoryParser.TaskAttemptInfo[finishedMaps]; 
+      reduceTasks = 
+        new JobHistoryParser.TaskAttemptInfo[finishedReduces]; 
+      int mapIndex = 0 , reduceIndex=0; 
+      avgMapTime = 0;
+      avgReduceTime = 0;
+      avgShuffleTime = 0;
+
+      for (JobHistoryParser.TaskInfo task : tasks.values()) {
+        Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
+          task.getAllTaskAttempts();
+        for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
+          if (attempt.getTaskStatus().
+              equals(TaskStatus.State.SUCCEEDED.toString())) {
+            long avgFinishTime = (attempt.getFinishTime() -
+                attempt.getStartTime());
+            if (attempt.getTaskType().equals(TaskType.MAP)) {
+              mapTasks[mapIndex++] = attempt; 
+              avgMapTime += avgFinishTime;
+            } else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
+              reduceTasks[reduceIndex++] = attempt;
+              avgShuffleTime += (attempt.getShuffleFinishTime() - 
+                  attempt.getStartTime());
+              avgReduceTime += (attempt.getFinishTime() -
+                  attempt.getShuffleFinishTime());
+            }
+            break;
+          }
+        }
+      }
+      if (finishedMaps > 0) {
+        avgMapTime /= finishedMaps;
+      }
+      if (finishedReduces > 0) {
+        avgReduceTime /= finishedReduces;
+        avgShuffleTime /= finishedReduces;
+      }
+    }
+  }
+
+  /**
+   * Utility to filter out events based on the task status
+   *
+   */
+  public static class FilteredJob {
+    
+    private Map<String, Set<TaskID>> badNodesToFilteredTasks =
+      new HashMap<String, Set<TaskID>>();
+    
+    private String filter;
+    
+    /** Get the map of the filtered tasks */
+    public Map<String, Set<TaskID>> getFilteredMap() {
+      return badNodesToFilteredTasks;
+    }
+    
+    /** Get the current filter */
+    public String getFilter() { return filter; }
+    
+    /** Apply the filter (status) on the parsed job and generate summary */
+    public FilteredJob(JobInfo job, String status) {
+
+      filter = status;
+      
+      Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
+
+      for (JobHistoryParser.TaskInfo task : tasks.values()) {
+        Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
+          task.getAllTaskAttempts();
+        for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
+          if (attempt.getTaskStatus().equals(status)) {
+            String hostname = attempt.getHostname();
+            TaskID id = attempt.getAttemptId().getTaskID();
+
+            Set<TaskID> set = badNodesToFilteredTasks.get(hostname);
+
+            if (set == null) {
+              set = new TreeSet<TaskID>();
+              set.add(id);
+              badNodesToFilteredTasks.put(hostname, set);
+            }else{
+              set.add(id);
+            }
+          }
+        }
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+
+/**
+ * Event to record successful completion of job
+ *
+ */
+public class JobFinishedEvent  implements HistoryEvent {
+
+  private EventCategory category;
+  private JobID jobid;
+  private long finishTime;
+  private int finishedMaps;
+  private int finishedReduces;
+  private int failedMaps;
+  private int failedReduces;
+  private Counters counters;
+
+  enum EventFields { EVENT_CATEGORY,
+    JOB_ID,
+    FINISH_TIME,
+    FINISHED_MAPS,
+    FINISHED_REDUCES,
+    FAILED_MAPS,
+    FAILED_REDUCES,
+    COUNTERS }
+
+  JobFinishedEvent() {
+  }
+
+  /** 
+   * Create an event to record successful job completion
+   * @param id Job ID
+   * @param finishTime Finish time of the job
+   * @param finishedMaps The number of finished maps
+   * @param finishedReduces The number of finished reduces
+   * @param failedMaps The number of failed maps
+   * @param failedReduces The number of failed reduces
+   * @param counters Counters for the job
+   */
+  public JobFinishedEvent(JobID id, long finishTime,
+      int finishedMaps, int finishedReduces,
+      int failedMaps, int failedReduces,
+      Counters counters) {
+    this.jobid = id;
+    this.finishTime = finishTime;
+    this.finishedMaps = finishedMaps;
+    this.finishedReduces = finishedReduces;
+    this.failedMaps = failedMaps;
+    this.failedReduces = failedReduces;
+    this.counters = counters;
+    this.category = EventCategory.JOB;
+  }
+
+  /** Get the Event Category */
+  public EventCategory getEventCategory() { return category; }
+  /** Get the Job ID */
+  public JobID getJobid() { return jobid; }
+  /** Get the job finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the number of finished maps for the job */
+  public int getFinishedMaps() { return finishedMaps; }
+  /** Get the number of finished reducers for the job */
+  public int getFinishedReduces() { return finishedReduces; }
+  /** Get the number of failed maps for the job */
+  public int getFailedMaps() { return failedMaps; }
+  /** Get the number of failed reducers for the job */
+  public int getFailedReduces() { return failedReduces; }
+  /** Get the counters for the job */
+  public Counters getCounters() { return counters; }
+  /** Get the event type */
+  public EventType getEventType() { 
+    return EventType.JOB_FINISHED;
+  }
+
+  public void readFields(JsonParser jp) throws IOException {
+    if (jp.nextToken() != JsonToken.START_OBJECT) {
+      throw new IOException("Unexpected token while reading");
+    }
+
+    while (jp.nextToken() != JsonToken.END_OBJECT) {
+      String fieldname = jp.getCurrentName();
+      jp.nextToken(); // move to value
+      switch (Enum.valueOf(EventFields.class, fieldname)) {
+      case EVENT_CATEGORY:
+        category = Enum.valueOf(EventCategory.class, jp.getText());
+        break;
+      case JOB_ID:
+        jobid = JobID.forName(jp.getText());
+        break;
+      case FINISH_TIME:
+        finishTime = jp.getLongValue();
+        break;
+      case FINISHED_MAPS:
+        finishedMaps = jp.getIntValue();
+        break;
+      case FINISHED_REDUCES:
+        finishedReduces = jp.getIntValue();
+        break;
+      case FAILED_MAPS:
+        failedMaps = jp.getIntValue();
+        break;
+      case FAILED_REDUCES:
+        failedReduces = jp.getIntValue();
+        break;
+      case COUNTERS:
+        counters = EventReader.readCounters(jp);
+        break;
+      default: 
+        throw new IOException("Unrecognized field '"+fieldname+"'!");
+      }
+    }
+  }
+
+  public void writeFields(JsonGenerator gen) throws IOException {
+    gen.writeStartObject();
+    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
+        category.toString());
+    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
+    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
+    gen.writeNumberField(EventFields.FINISHED_MAPS.toString(), finishedMaps);
+    gen.writeNumberField(EventFields.FINISHED_REDUCES.toString(), 
+        finishedReduces);
+    gen.writeNumberField(EventFields.FAILED_MAPS.toString(), failedMaps);
+    gen.writeNumberField(EventFields.FAILED_REDUCES.toString(),
+        failedReduces);
+    EventWriter.writeCounters(counters, gen);
+    gen.writeEndObject();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,519 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * JobHistory is the class that is responsible for creating and maintaining
+ * job history information.
+ *
+ */
+public class JobHistory {
+
+  final Log LOG = LogFactory.getLog(JobHistory.class);
+
+  private long jobHistoryBlockSize;
+  private Map<JobID, MetaInfo> fileMap;
+  private ThreadPoolExecutor executor = null;
+  static final FsPermission HISTORY_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0750); // rwxr-x---
+
+  public static final FsPermission HISTORY_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0740); // rwxr-----
+ 
+  private JobTracker jobTracker;
+
+  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //week
+  private FileSystem logDirFs; // log Dir FS
+  private FileSystem doneDirFs; // done Dir FS
+
+  private Path logDir = null;
+  private Path done = null; // folder for completed jobs
+
+  public static final String OLD_SUFFIX = ".old";
+  
+  // Version string that will prefix all History Files
+  public static final String HISTORY_VERSION = "1.0";
+
+  private HistoryCleaner historyCleanerThread = null;
+
+  /**
+   * Initialize Job History Module
+   * @param jt Job Tracker handle
+   * @param conf Configuration
+   * @param hostname Host name of JT
+   * @param jobTrackerStartTime Start time of JT
+   * @throws IOException
+   */
+  public void init(JobTracker jt, JobConf conf, String hostname,
+      long jobTrackerStartTime) throws IOException {
+
+    // Get and create the log folder
+    String logDirLoc = conf.get("hadoop.job.history.location" ,
+        "file:///" +
+        new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()
+        + File.separator + "history");
+
+    LOG.info("History log directory is " + logDirLoc);
+
+    logDir = new Path(logDirLoc);
+    logDirFs = logDir.getFileSystem(conf);
+
+    if (!logDirFs.exists(logDir)){
+      if (!logDirFs.mkdirs(logDir, 
+          new FsPermission(HISTORY_DIR_PERMISSION))) {
+        throw new IOException("Mkdirs failed to create " +
+            logDir.toString());
+      }
+    }
+    conf.set("hadoop.job.history.location", logDirLoc);
+
+    jobHistoryBlockSize = 
+      conf.getLong("mapred.jobtracker.job.history.block.size", 
+          3 * 1024 * 1024);
+    
+    jobTracker = jt;
+    
+    fileMap = new HashMap<JobID, MetaInfo> ();
+  }
+  
+  /** Initialize the done directory and start the history cleaner thread */
+  public void initDone(JobConf conf, FileSystem fs) throws IOException {
+    //if completed job history location is set, use that
+    String doneLocation =
+      conf.get("mapred.job.tracker.history.completed.location");
+    if (doneLocation != null) {
+      done = fs.makeQualified(new Path(doneLocation));
+      doneDirFs = fs;
+    } else {
+      done = logDirFs.makeQualified(new Path(logDir, "done"));
+      doneDirFs = logDirFs;
+    }
+
+    //If not already present create the done folder with appropriate 
+    //permission
+    if (!doneDirFs.exists(done)) {
+      LOG.info("Creating DONE folder at "+ done);
+      if (! doneDirFs.mkdirs(done, 
+          new FsPermission(HISTORY_DIR_PERMISSION))) {
+        throw new IOException("Mkdirs failed to create " + done.toString());
+      }
+    }
+    LOG.info("Inited the done directory to " + done.toString());
+
+    moveOldFiles();
+    startFileMoverThreads();
+
+    // Start the History Cleaner Thread
+    long maxAgeOfHistoryFiles = conf.getLong(
+        "mapreduce.cluster.jobhistory.maxage", DEFAULT_HISTORY_MAX_AGE);
+    historyCleanerThread = new HistoryCleaner(maxAgeOfHistoryFiles);
+    historyCleanerThread.start();
+  }
+
+  /**
+   * Move the completed job into the completed folder.
+   * This assumes that the job history file is closed and 
+   * all operations on the job history file is complete.
+   * This *should* be the last call to job history for a given job.
+   */
+  public void markCompleted(JobID id) throws IOException {
+    moveToDone(id);
+  }
+
+  /** Shut down JobHistory after stopping the History cleaner */
+  public void shutDown() {
+    LOG.info("Interrupting History Cleaner");
+    historyCleanerThread.interrupt();
+    try {
+      historyCleanerThread.join();
+    } catch (InterruptedException e) {
+      LOG.info("Error with shutting down history thread");
+    }
+  }
+
+  /** Get the done directory */
+  public synchronized String getDoneJobHistoryFileName(JobConf jobConf,
+      JobID id) throws IOException {
+    if (done == null) {
+      return null;
+    }
+    return getJobHistoryFileName(jobConf, id, done, doneDirFs);
+  }
+
+  /**
+   * Get the history location
+   */
+  public Path getJobHistoryLocation() {
+    return logDir;
+  }
+
+  /**
+   * Get the history location for completed jobs
+   */
+  public Path getCompletedJobHistoryLocation() {
+    return done;
+  }
+
+  /**
+   * @param dir The directory where to search.
+   */
+  private synchronized String getJobHistoryFileName(JobConf jobConf,
+      JobID id, Path dir, FileSystem fs)
+  throws IOException {
+    String user = getUserName(jobConf);
+    // Make the pattern matching the job's history file
+    final Pattern historyFilePattern =
+      Pattern.compile(id.toString() + "_" + user + "+");
+    // a path filter that matches the parts of the filenames namely
+    //  - job-id, user name
+    PathFilter filter = new PathFilter() {
+      public boolean accept(Path path) {
+        String fileName = path.getName();
+        return historyFilePattern.matcher(fileName).find();
+      }
+    };
+  
+    FileStatus[] statuses = fs.listStatus(dir, filter);
+    String filename = null;
+    if (statuses.length == 0) {
+      LOG.info("Nothing to recover for job " + id);
+    } else {
+      filename = statuses[0].getPath().getName();
+      LOG.info("Recovered job history filename for job " + id + " is "
+          + filename);
+    }
+    return filename;
+  }
+
+  String getNewJobHistoryFileName(JobConf conf, JobID jobId) {
+    return jobId.toString() +
+    "_" + getUserName(conf);
+  }
+
+  /**
+   * Get the job history file path given the history filename
+   */
+  private Path getJobHistoryLogLocation(String logFileName) {
+    return logDir == null ? null : new Path(logDir, logFileName);
+  }
+
+  /**
+   * Create an event writer for the Job represented by the jobID.
+   * This should be the first call to history for a job
+   * @param jobId
+   * @param jobConf
+   * @throws IOException
+   */
+  public void setupEventWriter(JobID jobId, JobConf jobConf)
+  throws IOException {
+    String logFileName = getNewJobHistoryFileName(jobConf, jobId);
+  
+    Path logFile = getJobHistoryLogLocation(logFileName);
+  
+    if (logDir == null) {
+      LOG.info("Log Directory is null, returning");
+      throw new IOException("Missing Log Directory for History");
+    }
+  
+    int defaultBufferSize = 
+      logDirFs.getConf().getInt("io.file.buffer.size", 4096);
+  
+    LOG.info("SetupWriter, creating file " + logFile);
+  
+    FSDataOutputStream out = logDirFs.create(logFile, 
+        new FsPermission(JobHistory.HISTORY_FILE_PERMISSION),
+        EnumSet.of(CreateFlag.OVERWRITE), 
+        defaultBufferSize, 
+        logDirFs.getDefaultReplication(), 
+        jobHistoryBlockSize, null);
+  
+    EventWriter writer = new EventWriter(out);
+  
+    /* Storing the job conf on the log dir */
+  
+    Path logDirConfPath = getConfFile(jobId);
+    LOG.info("LogDirConfPath is " + logDirConfPath);
+  
+    FSDataOutputStream jobFileOut = null;
+    try {
+      if (logDirConfPath != null) {
+        defaultBufferSize =
+          logDirFs.getConf().getInt("io.file.buffer.size", 4096);
+        if (!logDirFs.exists(logDirConfPath)) {
+          jobFileOut = logDirFs.create(logDirConfPath,
+              new FsPermission(JobHistory.HISTORY_FILE_PERMISSION),
+              EnumSet.of(CreateFlag.OVERWRITE),
+              defaultBufferSize,
+              logDirFs.getDefaultReplication(),
+              logDirFs.getDefaultBlockSize(), null);
+          jobConf.writeXml(jobFileOut);
+          jobFileOut.close();
+        }
+      }
+    } catch (IOException e) {
+      LOG.info("Failed to close the job configuration file " 
+          + StringUtils.stringifyException(e));
+    }
+  
+    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer);
+    fileMap.put(jobId, fi);
+  }
+
+  /** Close the event writer for this id */
+  public void closeWriter(JobID id) {
+    try {
+      EventWriter writer = getWriter(id);
+      writer.close();
+    } catch (IOException e) {
+      LOG.info("Error closing writer for JobID: " + id);
+    }
+  }
+
+
+  /**
+   * Get the JsonEventWriter for the specified Job Id
+   * @param jobId
+   * @return
+   * @throws IOException if a writer is not available
+   */
+  private EventWriter getWriter(final JobID jobId) throws IOException {
+    EventWriter writer = null;
+    MetaInfo mi = fileMap.get(jobId);
+    if (mi == null || (writer = mi.getEventWriter()) == null) {
+      throw new IOException("History File does not exist for JobID");
+    }
+    return writer;
+  }
+
+  /**
+   * Method to log the specified event
+   * @param event The event to log
+   * @param id The Job ID of the event
+   */
+  public void logEvent(HistoryEvent event, JobID id) {
+    try {
+      EventWriter writer = getWriter(id);
+      writer.write(event);
+    } catch (IOException e) {
+      LOG.error("Error creating writer, " + e.getMessage());
+    }
+  }
+
+
+  private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
+    //check if path exists, in case of retries it may not exist
+    if (logDirFs.exists(fromPath)) {
+      LOG.info("Moving " + fromPath.toString() + " to " +
+          toPath.toString());
+      doneDirFs.moveFromLocalFile(fromPath, toPath);
+      doneDirFs.setPermission(toPath,
+          new FsPermission(JobHistory.HISTORY_FILE_PERMISSION));
+    }
+  }
+  
+  private void startFileMoverThreads() {
+    executor = new ThreadPoolExecutor(1, 3, 1, 
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+  }
+
+  Path getConfFile(JobID jobId) {
+    Path jobFilePath = null;
+    if (logDir != null) {
+      jobFilePath = new Path(logDir + File.separator +
+          jobId.toString() + "_conf.xml");
+    }
+    return jobFilePath;
+  }
+
+  private void moveOldFiles() throws IOException {
+    //move the log files remaining from last run to the DONE folder
+    //suffix the file name based on Jobtracker identifier so that history
+    //files with same job id don't get over written in case of recovery.
+    FileStatus[] files = logDirFs.listStatus(logDir);
+    String jtIdentifier = jobTracker.getTrackerIdentifier();
+    String fileSuffix = "." + jtIdentifier + JobHistory.OLD_SUFFIX;
+    for (FileStatus fileStatus : files) {
+      Path fromPath = fileStatus.getPath();
+      if (fromPath.equals(done)) { //DONE can be a subfolder of log dir
+        continue;
+      }
+      LOG.info("Moving log file from last run: " + fromPath);
+      Path toPath = new Path(done, fromPath.getName() + fileSuffix);
+      moveToDoneNow(fromPath, toPath);
+    }
+  }
+  
+  private void moveToDone(final JobID id) {
+    final List<Path> paths = new ArrayList<Path>();
+    MetaInfo metaInfo = fileMap.get(id);
+    if (metaInfo == null) {
+      LOG.info("No file for job-history with " + id + " found in cache!");
+      return;
+    }
+    
+    final Path historyFile = metaInfo.getHistoryFile();
+    if (historyFile == null) {
+      LOG.info("No file for job-history with " + id + " found in cache!");
+    } else {
+      paths.add(historyFile);
+    }
+
+    final Path confPath = metaInfo.getConfFile();
+    if (confPath == null) {
+      LOG.info("No file for jobconf with " + id + " found in cache!");
+    } else {
+      paths.add(confPath);
+    }
+
+    executor.execute(new Runnable() {
+
+      public void run() {
+        //move the files to DONE folder
+        try {
+          for (Path path : paths) { 
+            moveToDoneNow(path, new Path(done, path.getName()));
+          }
+        } catch (Throwable e) {
+          LOG.error("Unable to move history file to DONE folder.", e);
+        }
+        String historyFileDonePath = null;
+        if (historyFile != null) {
+          historyFileDonePath = new Path(done, 
+              historyFile.getName()).toString();
+        }
+        
+        jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id),
+            historyFileDonePath);
+
+        //purge the job from the cache
+        fileMap.remove(id);
+      }
+
+    });
+  }
+  
+  private String getUserName(JobConf jobConf) {
+    String user = jobConf.getUser();
+    if (user == null) {
+      user = "";
+    }
+    return user;
+  }
+  
+  private static class MetaInfo {
+    private Path historyFile;
+    private Path confFile;
+    private EventWriter writer;
+
+    MetaInfo(Path historyFile, Path conf, EventWriter writer) {
+      this.historyFile = historyFile;
+      this.confFile = conf;
+      this.writer = writer;
+    }
+
+    Path getHistoryFile() { return historyFile; }
+    Path getConfFile() { return confFile; }
+    EventWriter getEventWriter() { return writer; }
+  }
+
+  /**
+   * Delete history files older than a specified time duration.
+   */
+  class HistoryCleaner extends Thread {
+    static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
+    private long cleanupFrequency;
+    private long maxAgeOfHistoryFiles;
+  
+    public HistoryCleaner(long maxAge) {
+      setName("Thread for cleaning up History files");
+      setDaemon(true);
+      this.maxAgeOfHistoryFiles = maxAge;
+      cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
+      LOG.info("Job History Cleaner Thread started." +
+          " MaxAge is " + 
+          maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
+          " Cleanup Frequency is " +
+          + cleanupFrequency + " ms (" +
+          ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)");
+    }
+  
+    @Override
+    public void run(){
+  
+      while (true) {
+        try {
+          doCleanup(); 
+          Thread.sleep(cleanupFrequency);
+        }
+        catch (InterruptedException e) {
+          LOG.info("History Cleaner thread exiting");
+          return;
+        }
+        catch (Throwable t) {
+          LOG.warn("History cleaner thread threw an exception", t);
+        }
+      }
+    }
+  
+    private void doCleanup() {
+      long now = System.currentTimeMillis();
+      try {
+        FileStatus[] historyFiles = doneDirFs.listStatus(done);
+        if (historyFiles != null) {
+          for (FileStatus f : historyFiles) {
+            if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
+              doneDirFs.delete(f.getPath(), true); 
+              LOG.info("Deleting old history file : " + f.getPath());
+            }
+          }
+        }
+      } catch (IOException ie) {
+        LOG.info("Error cleaning up history directory" + 
+            StringUtils.stringifyException(ie));
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Default Parser for the JobHistory files. Typical usage is
+ * JobHistoryParser parser = new JobHistoryParser(fs, historyFile);
+ * job = parser.parse();
+ *
+ */
+public class JobHistoryParser {
+
+  private final FSDataInputStream in;
+  JobInfo info = null;
+
+  /**
+   * Create a job history parser for the given history file using the 
+   * given file system
+   * @param fs
+   * @param file
+   * @throws IOException
+   */
+  public JobHistoryParser(FileSystem fs, String file) throws IOException {
+    this(fs, new Path(file));
+  }
+  
+  /**
+   * Create the job history parser for the given history file using the 
+   * given file system
+   * @param fs
+   * @param historyFile
+   * @throws IOException
+   */
+  public JobHistoryParser(FileSystem fs, Path historyFile) 
+  throws IOException {
+    in = fs.open(historyFile);
+  }
+  
+  /**
+   * Create the history parser based on the input stream
+   * @param in
+   */
+  public JobHistoryParser(FSDataInputStream in) {
+    this.in = in;
+  }
+  
+  /**
+   * Parse the entire history file and populate the JobInfo object
+   * The first invocation will populate the object, subsequent calls
+   * will return the already parsed object. 
+   * The input stream is closed on return 
+   * @return The populated jobInfo object
+   * @throws IOException
+   */
+  public synchronized JobInfo parse() throws IOException {
+
+    if (info != null) {
+      return info;
+    }
+
+    EventReader reader = new EventReader(in);
+
+    HistoryEvent event;
+    info = new JobInfo();
+    try {
+      while ((event = reader.getNextEvent()) != null) {
+        handleEvent(event);
+      }
+    } finally {
+      in.close();
+    }
+    return info;
+  }
+  
+  private void handleEvent(HistoryEvent event) throws IOException { 
+    EventType type = event.getEventType();
+
+    switch (type) {
+    case JOB_SUBMITTED:
+      handleJobSubmittedEvent((JobSubmittedEvent)event);
+      break;
+    case JOB_STATUS_CHANGED:
+      break;
+    case JOB_INFO_CHANGED:
+      handleJobInfoChangeEvent((JobInfoChangeEvent) event);
+      break;
+    case JOB_INITED:
+      handleJobInitedEvent((JobInitedEvent) event);
+      break;
+    case JOB_PRIORITY_CHANGED:
+      handleJobPriorityChangeEvent((JobPriorityChangeEvent) event);
+      break;
+    case JOB_FAILED:
+    case JOB_KILLED:
+      handleJobFailedEvent((JobUnsuccessfulCompletionEvent) event);
+      break;
+    case JOB_FINISHED:
+      handleJobFinishedEvent((JobFinishedEvent)event);
+      break;
+    case TASK_STARTED:
+      handleTaskStartedEvent((TaskStartedEvent) event);
+      break;
+    case TASK_FAILED:
+      handleTaskFailedEvent((TaskFailedEvent) event);
+      break;
+    case TASK_UPDATED:
+      handleTaskUpdatedEvent((TaskUpdatedEvent) event);
+      break;
+    case TASK_FINISHED:
+      handleTaskFinishedEvent((TaskFinishedEvent) event);
+      break;
+    case MAP_ATTEMPT_STARTED:
+    case CLEANUP_ATTEMPT_STARTED:
+    case REDUCE_ATTEMPT_STARTED:
+    case SETUP_ATTEMPT_STARTED:
+      handleTaskAttemptStartedEvent((TaskAttemptStartedEvent) event);
+      break;
+    case MAP_ATTEMPT_FAILED:
+    case CLEANUP_ATTEMPT_FAILED:
+    case REDUCE_ATTEMPT_FAILED:
+    case SETUP_ATTEMPT_FAILED:
+    case MAP_ATTEMPT_KILLED:
+    case CLEANUP_ATTEMPT_KILLED:
+    case REDUCE_ATTEMPT_KILLED:
+    case SETUP_ATTEMPT_KILLED:
+      handleTaskAttemptFailedEvent(
+          (TaskAttemptUnsuccessfulCompletionEvent) event);
+      break;
+    case MAP_ATTEMPT_FINISHED:
+      handleMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
+      break;
+    case REDUCE_ATTEMPT_FINISHED:
+      handleReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
+      break;
+    case SETUP_ATTEMPT_FINISHED:
+    case CLEANUP_ATTEMPT_FINISHED:
+      handleTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+      break;
+    default:
+      break;
+    }
+  }
+  
+  private void handleTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    TaskAttemptInfo attemptInfo = 
+      taskInfo.attemptsMap.get(event.getAttemptId());
+    attemptInfo.finishTime = event.getFinishTime();
+    attemptInfo.status = event.getTaskStatus();
+    attemptInfo.state = event.getState();
+    attemptInfo.counters = event.getCounters();
+    attemptInfo.hostname = event.getHostname();
+  }
+
+  private void handleReduceAttemptFinishedEvent
+  (ReduceAttemptFinishedEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    TaskAttemptInfo attemptInfo = 
+      taskInfo.attemptsMap.get(event.getAttemptId());
+    attemptInfo.finishTime = event.getFinishTime();
+    attemptInfo.status = event.getTaskStatus();
+    attemptInfo.state = event.getState();
+    attemptInfo.shuffleFinishTime = event.getShuffleFinishTime();
+    attemptInfo.sortFinishTime = event.getSortFinishTime();
+    attemptInfo.counters = event.getCounters();
+    attemptInfo.hostname = event.getHostname();
+  }
+
+  private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    TaskAttemptInfo attemptInfo = 
+      taskInfo.attemptsMap.get(event.getAttemptId());
+    attemptInfo.finishTime = event.getFinishTime();
+    attemptInfo.status = event.getTaskStatus();
+    attemptInfo.state = event.getState();
+    attemptInfo.mapFinishTime = event.getMapFinishTime();
+    attemptInfo.counters = event.getCounters();
+    attemptInfo.hostname = event.getHostname();
+  }
+
+  private void handleTaskAttemptFailedEvent(
+      TaskAttemptUnsuccessfulCompletionEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    TaskAttemptInfo attemptInfo = 
+      taskInfo.attemptsMap.get(event.getTaskAttemptId());
+    attemptInfo.finishTime = event.getFinishTime();
+    attemptInfo.error = event.getError();
+    attemptInfo.status = event.getTaskStatus();
+    attemptInfo.hostname = event.getHostname();
+  }
+
+  private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    TaskAttemptID attemptId = event.getTaskAttemptId();
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    
+    TaskAttemptInfo attemptInfo = new TaskAttemptInfo();
+    attemptInfo.startTime = event.getStartTime();
+    attemptInfo.attemptId = event.getTaskAttemptId();
+    attemptInfo.httpPort = event.getHttpPort();
+    attemptInfo.trackerName = event.getTrackerName();
+    attemptInfo.taskType = event.getTaskType();
+    
+    taskInfo.attemptsMap.put(attemptId, attemptInfo);
+  }
+
+  private void handleTaskFinishedEvent(TaskFinishedEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    taskInfo.counters = event.getCounters();
+    taskInfo.finishTime = event.getFinishTime();
+    taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
+  }
+
+  private void handleTaskUpdatedEvent(TaskUpdatedEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    taskInfo.finishTime = event.getFinishTime();
+  }
+
+  private void handleTaskFailedEvent(TaskFailedEvent event) {
+    TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    taskInfo.status = TaskStatus.State.FAILED.toString();
+    taskInfo.finishTime = event.getFinishTime();
+    taskInfo.error = event.getError();
+    taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
+  }
+
+  private void handleTaskStartedEvent(TaskStartedEvent event) {
+    TaskInfo taskInfo = new TaskInfo();
+    taskInfo.taskId = event.getTaskId();
+    taskInfo.startTime = event.getStartTime();
+    taskInfo.taskType = event.getTaskType();
+    taskInfo.splitLocations = event.getSplitLocations();
+    info.tasksMap.put(event.getTaskId(), taskInfo);
+  }
+
+  private void handleJobFailedEvent(JobUnsuccessfulCompletionEvent event) {
+    info.finishTime = event.getFinishTime();
+    info.finishedMaps = event.getFinishedMaps();
+    info.finishedReduces = event.getFinishedReduces();
+    info.jobStatus = event.getStatus();
+  }
+
+  private void handleJobFinishedEvent(JobFinishedEvent event) {
+    info.finishTime = event.getFinishTime();
+    info.finishedMaps = event.getFinishedMaps();
+    info.finishedReduces = event.getFinishedReduces();
+    info.failedMaps = event.getFailedMaps();
+    info.failedReduces = event.getFailedReduces();
+    info.counters = event.getCounters();
+    info.jobStatus = JobStatus.getJobRunState(JobStatus.SUCCEEDED);
+  }
+
+  private void handleJobPriorityChangeEvent(JobPriorityChangeEvent event) {
+    info.priority = event.getPriority();
+  }
+
+  private void handleJobInitedEvent(JobInitedEvent event) {
+    info.launchTime = event.getLaunchTime();
+    info.totalMaps = event.getTotalMaps();
+    info.totalReduces = event.getTotalReduces();
+  }
+
+  private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
+    info.submitTime = event.getSubmitTime();
+    info.launchTime = event.getLaunchTime();
+  }
+
+  private void handleJobSubmittedEvent(JobSubmittedEvent event) {
+    info.jobid = event.getJobId();
+    info.jobname = event.getJobName();
+    info.username = event.getUserName();
+    info.submitTime = event.getSubmitTime();
+    info.jobConfPath = event.getJobConfPath();
+  }
+
+  /**
+   * The class where job information is aggregated into after parsing
+   */
+  public static class JobInfo {
+    long submitTime;
+    long finishTime;
+    JobID jobid;
+    String username;
+    String jobname;
+    String jobConfPath;
+    long launchTime;
+    int totalMaps;
+    int totalReduces;
+    int failedMaps;
+    int failedReduces;
+    int finishedMaps;
+    int finishedReduces;
+    String jobStatus;
+    Counters counters;
+    JobPriority priority;
+    
+    Map<TaskID, TaskInfo> tasksMap;
+    
+    /** Create a job info object where job information will be stored
+     * after a parse
+     */
+    public JobInfo() {
+      submitTime = launchTime = finishTime = -1;
+      totalMaps = totalReduces = failedMaps = failedReduces = 0;
+      finishedMaps = finishedReduces = 0;
+      username = jobname = jobConfPath = "";
+      tasksMap = new HashMap<TaskID, TaskInfo>();
+    }
+    
+    /** Print all the job information */
+    public void printAll() {
+      System.out.println("JOBNAME: " + jobname);
+      System.out.println("USERNAME: " + username);
+      System.out.println("SUBMIT_TIME" + submitTime);
+      System.out.println("LAUNCH_TIME: " + launchTime);
+      System.out.println("JOB_STATUS: " + jobStatus);
+      System.out.println("PRIORITY: " + priority);
+      System.out.println("TOTAL_MAPS: " + totalMaps);
+      System.out.println("TOTAL_REDUCES: " + totalReduces);
+      System.out.println("COUNTERS: " + counters.toString());
+      
+      for (TaskInfo ti: tasksMap.values()) {
+        ti.printAll();
+      }
+    }
+
+    /** Get the job submit time */
+    public long getSubmitTime() { return submitTime; }
+    /** Get the job finish time */
+    public long getFinishTime() { return finishTime; }
+    /** Get the job id */
+    public JobID getJobId() { return jobid; }
+    /** Get the user name */
+    public String getUsername() { return username; }
+    /** Get the job name */
+    public String getJobname() { return jobname; }
+    /** Get the path for the job configuration file */
+    public String getJobConfPath() { return jobConfPath; }
+    /** Get the job launch time */
+    public long getLaunchTime() { return launchTime; }
+    /** Get the total number of maps */
+    public long getTotalMaps() { return totalMaps; }
+    /** Get the total number of reduces */
+    public long getTotalReduces() { return totalReduces; }
+    /** Get the total number of failed maps */
+    public long getFailedMaps() { return failedMaps; }
+    /** Get the number of failed reduces */
+    public long getFailedReduces() { return failedReduces; }
+    /** Get the number of finished maps */
+    public long getFinishedMaps() { return finishedMaps; }
+    /** Get the number of finished reduces */
+    public long getFinishedReduces() { return finishedReduces; }
+    /** Get the job status */
+    public String getJobStatus() { return jobStatus; }
+    /** Get the counters for the job */
+    public Counters getCounters() { return counters; }
+    /** Get the map of all tasks in this job */
+    public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; }
+    /** Get the priority of this job */
+    public String getPriority() { return priority.toString(); }
+  }
+  
+  /**
+   * TaskInformation is aggregated in this class after parsing
+   */
+  public static class TaskInfo {
+    TaskID taskId;
+    long startTime;
+    long finishTime;
+    TaskType taskType;
+    String splitLocations;
+    Counters counters;
+    String status;
+    String error;
+    TaskAttemptID failedDueToAttemptId;
+    Map<TaskAttemptID, TaskAttemptInfo> attemptsMap;
+
+    public TaskInfo() {
+      startTime = finishTime = -1;
+      error = splitLocations = "";
+      attemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
+    }
+    
+    public void printAll() {
+      System.out.println("TASK_ID:" + taskId.toString());
+      System.out.println("START_TIME: " + startTime);
+      System.out.println("FINISH_TIME:" + finishTime);
+      System.out.println("TASK_TYPE:" + taskType);
+      System.out.println("COUNTERS:" + counters.toString());
+      
+      for (TaskAttemptInfo tinfo: attemptsMap.values()) {
+        tinfo.printAll();
+      }
+    }
+    
+    /** Get the Task ID */
+    public TaskID getTaskId() { return taskId; }
+    /** Get the start time of this task */
+    public long getStartTime() { return startTime; }
+    /** Get the finish time of this task */
+    public long getFinishTime() { return finishTime; }
+    /** Get the task type */
+    public TaskType getTaskType() { return taskType; }
+    /** Get the split locations */
+    public String getSplitLocations() { return splitLocations; }
+    /** Get the counters for this task */
+    public Counters getCounters() { return counters; }
+    /** Get the task status */
+    public String getTaskStatus() { return status; }
+    /** Get the attempt Id that caused this task to fail */
+    public TaskAttemptID getFailedDueToAttemptId() {
+      return failedDueToAttemptId;
+    }
+    /** Get the error */
+    public String getError() { return error; }
+    /** Get the map of all attempts for this task */
+    public Map<TaskAttemptID, TaskAttemptInfo> getAllTaskAttempts() {
+      return attemptsMap;
+    }
+  }
+  
+  /**
+   * Task Attempt Information is aggregated in this class after parsing
+   */
+  public static class TaskAttemptInfo {
+    TaskAttemptID attemptId;
+    long startTime;
+    long finishTime;
+    long shuffleFinishTime;
+    long sortFinishTime;
+    long mapFinishTime;
+    String error;
+    String status;
+    String state;
+    TaskType taskType;
+    String trackerName;
+    Counters counters;
+    int httpPort;
+    String hostname;
+
+    /** Create a Task Attempt Info which will store attempt level information
+     * on a history parse.
+     */
+    public TaskAttemptInfo() {
+      startTime = finishTime = shuffleFinishTime = sortFinishTime = 
+        mapFinishTime = -1;
+      error =  state =  trackerName = hostname = "";
+      httpPort = -1;
+    }
+    /**
+     * Print all the information about this attempt.
+     */
+    public void printAll() {
+      System.out.println("ATTEMPT_ID:" + attemptId.toString());
+      System.out.println("START_TIME: " + startTime);
+      System.out.println("FINISH_TIME:" + finishTime);
+      System.out.println("ERROR:" + error);
+      System.out.println("TASK_STATUS:" + status);
+      System.out.println("STATE:" + state);
+      System.out.println("TASK_TYPE:" + taskType);
+      System.out.println("TRACKER_NAME:" + trackerName);
+      System.out.println("HTTP_PORT:" + httpPort);
+      System.out.println("COUNTERS:" + counters.toString());
+    }
+
+    /** Get the attempt Id */
+    public TaskAttemptID getAttemptId() { return attemptId; }
+    /** Get the start time of the attempt */
+    public long getStartTime() { return startTime; }
+    /** Get the finish time of the attempt */
+    public long getFinishTime() { return finishTime; }
+    /** Get the shuffle finish time. Applicable only for reduce attempts */
+    public long getShuffleFinishTime() { return shuffleFinishTime; }
+    /** Get the sort finish time. Applicable only for reduce attempts */
+    public long getSortFinishTime() { return sortFinishTime; }
+    /** Get the map finish time. Applicable only for map attempts */
+    public long getMapFinishTime() { return mapFinishTime; }
+    /** Get the error string */
+    public String getError() { return error; }
+    /** Get the state */
+    public String getState() { return state; }
+    /** Get the task status */
+    public String getTaskStatus() { return status; }
+    /** Get the task type */
+    public TaskType getTaskType() { return taskType; }
+    /** Get the tracker name where the attempt executed */
+    public String getTrackerName() { return trackerName; }
+    /** Get the host name */
+    public String getHostname() { return hostname; }
+    /** Get the counters for the attempt */
+    public Counters getCounters() { return counters; }
+    /** Get the HTTP port for the tracker */
+    public int getHttpPort() { return httpPort; }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobID;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+
+/**
+ * Event to record changes in the submit and launch time of
+ * a job
+ */
+public class JobInfoChangeEvent implements HistoryEvent {
+
+  private EventCategory category;
+  private JobID jobid;
+  private  long submitTime;
+  private  long launchTime;
+
+  enum EventFields { EVENT_CATEGORY,
+                     JOB_ID,
+                     SUBMIT_TIME,
+                     LAUNCH_TIME }
+
+  /** 
+   * Create a event to record the submit and launch time of a job
+   * @param id Job Id 
+   * @param submitTime Submit time of the job
+   * @param launchTime Launch time of the job
+   */
+  public JobInfoChangeEvent(JobID id, long submitTime, long launchTime) {
+    this.jobid = id;
+    this.submitTime = submitTime;
+    this.launchTime = launchTime;
+    this.category = EventCategory.JOB;
+  }
+
+  JobInfoChangeEvent() { }
+
+  /** Get the Job ID */
+  public JobID getJobId() { return jobid; }
+  /** Get the Job submit time */
+  public long getSubmitTime() { return submitTime; }
+  /** Get the Job launch time */
+  public long getLaunchTime() { return launchTime; }
+  /** Get the event category */
+  public EventCategory getEventCategory() { return category; }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.JOB_INFO_CHANGED;
+  }
+
+  public void readFields(JsonParser jp) throws IOException {
+    if (jp.nextToken() != JsonToken.START_OBJECT) {
+      throw new IOException("Unexpected Token while reading");
+    }
+    
+    while (jp.nextToken() != JsonToken.END_OBJECT) {
+      String fieldName = jp.getCurrentName();
+      jp.nextToken(); // Move to the value
+      switch (Enum.valueOf(EventFields.class, fieldName)) {
+        case EVENT_CATEGORY: 
+          category = Enum.valueOf(EventCategory.class, jp.getText());
+          break;
+        case JOB_ID:
+          jobid = JobID.forName(jp.getText());
+          break;
+        case SUBMIT_TIME:
+          submitTime = jp.getLongValue();
+          break;
+        case LAUNCH_TIME:
+          launchTime = jp.getLongValue();
+          break;
+        default: 
+        throw new IOException("Unrecognized field '"+fieldName+"'!");
+      }
+    }
+  }
+
+  public void writeFields(JsonGenerator gen) throws IOException {
+    gen.writeStartObject();
+    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
+                         category.toString());
+    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
+    gen.writeNumberField(EventFields.SUBMIT_TIME.toString(), submitTime);
+    gen.writeNumberField(EventFields.LAUNCH_TIME.toString(), launchTime);
+    gen.writeEndObject();
+  }
+}



Mime
View raw message