hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r816052 [4/5] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/con...
Date Thu, 17 Sep 2009 05:04:27 GMT
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Thu Sep 17 05:04:21 2009
@@ -20,46 +20,44 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobHistory.*;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 
 /**
- * Tests the JobHistory files - to catch any changes to JobHistory that can
- * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
  *
  * testJobHistoryFile
  * Run a job that will be succeeded and validate its history file format and
  * content.
  *
- * testJobHistoryUserLogLocation
- * Run jobs with the given values of hadoop.job.history.user.location as
- *   (1)null(default case), (2)"none", and (3)some user specified dir.
- *   Validate user history file location in each case.
- *
  * testJobHistoryJobStatus
  * Run jobs that will be (1) succeeded (2) failed (3) killed.
  *   Validate job status read from history file in each case.
  *
- * Future changes to job history are to be reflected here in this file.
  */
 public class TestJobHistory extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
@@ -67,9 +65,8 @@
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  private static final Pattern digitsPattern =
-                                     Pattern.compile(JobHistory.DIGITS);
-
+  private static final String DIGITS = "[0-9]+";
+  
   // hostname like   /default-rack/host1.foo.com OR host1.foo.com
   private static final Pattern hostNamePattern = Pattern.compile(
                                        "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)");
@@ -77,10 +74,10 @@
   private static final String IP_ADDR =
                        "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?";
 
-  // hostname like   /default-rack/host1.foo.com OR host1.foo.com
   private static final Pattern trackerNamePattern = Pattern.compile(
-                         "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
-                         IP_ADDR + ":" + JobHistory.DIGITS);
+      "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
+      IP_ADDR + ":" + DIGITS);
+  
 
   private static final Pattern splitsPattern = Pattern.compile(
                               hostNamePattern + "(," + hostNamePattern + ")*");
@@ -91,202 +88,25 @@
   //Each Task End seen from history file is added here
   private static List<String> taskEnds = new ArrayList<String>();
 
-  // List of tasks that appear in history file after JT reatart. This is to
-  // allow START_TIME=0 for these tasks.
-  private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
-
-  // List of potential tasks whose start time can be 0 because of JT restart
-  private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
-
-  /**
-   * Listener for history log file, it populates JobHistory.JobInfo
-   * object with data from log file and validates the data.
-   */
-  static class TestListener
-                    extends DefaultJobHistoryParser.JobTasksParseListener {
-    int lineNum;//line number of history log file
-    boolean isJobLaunched;
-    boolean isJTRestarted;
-
-    TestListener(JobInfo job) {
-      super(job);
-      lineNum = 0;
-      isJobLaunched = false;
-      isJTRestarted = false;
-    }
-
-    // TestListener implementation
-    public void handle(RecordTypes recType, Map<Keys, String> values)
-    throws IOException {
-
-      lineNum++;
-
-      // Check if the record is of type Meta
-      if (recType == JobHistory.RecordTypes.Meta) {
-        long version = Long.parseLong(values.get(Keys.VERSION));
-        assertTrue("Unexpected job history version ",
-                   (version >= 0 && version <= JobHistory.VERSION));
-      }
-      else if (recType.equals(RecordTypes.Job)) {
-        String jobid = values.get(Keys.JOBID);
-        assertTrue("record type 'Job' is seen without JOBID key" +
-        		" in history file at line " + lineNum, jobid != null);
-        JobID id = JobID.forName(jobid);
-        assertTrue("JobID in history file is in unexpected format " +
-                  "at line " + lineNum, id != null);
-        String time = values.get(Keys.LAUNCH_TIME);
-        if (time != null) {
-          if (isJobLaunched) {
-            // We assume that if we see LAUNCH_TIME again, it is because of JT restart
-            isJTRestarted = true;
-          }
-          else {// job launched first time
-            isJobLaunched = true;
-          }
-        }
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          assertTrue ("Job FINISH_TIME is seen in history file at line " +
-                      lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
-        }
-      }
-      else if (recType.equals(RecordTypes.Task)) {
-        String taskid = values.get(Keys.TASKID);
-        assertTrue("record type 'Task' is seen without TASKID key" +
-        		" in history file at line " + lineNum, taskid != null);
-        TaskID id = TaskID.forName(taskid);
-        assertTrue("TaskID in history file is in unexpected format " +
-                  "at line " + lineNum, id != null);
-        
-        String time = values.get(Keys.START_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue("Duplicate START_TIME seen for task " + taskid +
-                     " in history file at line " + lineNum, attemptIDs == null);
-          attemptIDs = new ArrayList<String>();
-          taskIDsToAttemptIDs.put(taskid, attemptIDs);
-
-          if (isJTRestarted) {
-            // This maintains a potential ignoreStartTimeTasks list
-            tempIgnoreStartTimeOfTasks.add(taskid);
-          }
-        }
-
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          String s = values.get(Keys.TASK_STATUS);
-          if (s != null) {
-            List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-            assertTrue ("Task FINISH_TIME is seen in history file at line " +
-                    lineNum + " before START_TIME is seen", attemptIDs != null);
-
-            // Check if all the attemptIDs of this task are finished
-            assertTrue("TaskId " + taskid + " is finished at line " +
-                       lineNum + " but its attemptID is not finished.",
-                       (attemptIDs.size() <= 1));
-
-            // Check if at least 1 attempt of this task is seen
-            assertTrue("TaskId " + taskid + " is finished at line " +
-                       lineNum + " but no attemptID is seen before this.",
-                       attemptIDs.size() == 1);
-
-            if (s.equals("KILLED") || s.equals("FAILED")) {
-              // Task End with KILLED/FAILED status in history file is
-              // considered as TaskEnd, TaskStart. This is useful in checking
-              // the order of history lines.
-              attemptIDs = new ArrayList<String>();
-              taskIDsToAttemptIDs.put(taskid, attemptIDs);
-            }
-            else {
-              taskEnds.add(taskid);
-            }
-          }
-          else {
-            // This line of history file could be just an update to finish time
-          }
-        }
-      }
-      else if (recType.equals(RecordTypes.MapAttempt) ||
-                 recType.equals(RecordTypes.ReduceAttempt)) {
-        String taskid =  values.get(Keys.TASKID);
-        assertTrue("record type " + recType + " is seen without TASKID key" +
-        		" in history file at line " + lineNum, taskid != null);
-        
-        String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-        TaskAttemptID id = TaskAttemptID.forName(attemptId);
-        assertTrue("AttemptID in history file is in unexpected format " +
-                   "at line " + lineNum, id != null);
-        
-        String time = values.get(Keys.START_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
-                      " before Task is seen", attemptIDs != null);
-          assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
-                      "file at line " + lineNum, attemptIDs.remove(attemptId));
-
-          if (attemptIDs.isEmpty()) {
-            //just a boolean whether any attempt is seen or not
-            attemptIDs.add("firstAttemptIsSeen");
-          }
-          attemptIDs.add(attemptId);
-
-          if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
-              (id.getId() < 1000)) {
-            // If Task line of this attempt is seen in history file after
-            // JT restart and if this attempt is < 1000(i.e. attempt is noti
-            // started after JT restart) - assuming single JT restart happened
-            ignoreStartTimeOfTasks.add(taskid);
-          }
-        }
-
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
-                      + lineNum + " before Task is seen", attemptIDs != null);
-
-          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
-                      + lineNum + " before TaskAttempt START_TIME is seen",
-                      attemptIDs.remove(attemptId));
-        }
-      }
-      super.handle(recType, values);
-    }
-  }
-
-  // Check if the time is in the expected format
-  private static boolean isTimeValid(String time) {
-    Matcher m = digitsPattern.matcher(time);
-    return m.matches() && (Long.parseLong(time) > 0);
-  }
-
-  private static boolean areTimesInOrder(String time1, String time2) {
-    return (Long.parseLong(time1) <= Long.parseLong(time2));
-  }
 
   // Validate Format of Job Level Keys, Values read from history file
-  private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
+  private static void validateJobLevelKeyValuesFormat(JobInfo jobInfo,
                                                       String status) {
-    String time = values.get(Keys.SUBMIT_TIME);
-    assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
-               " in history file", isTimeValid(time));
-
-    time = values.get(Keys.LAUNCH_TIME);
-    assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
-               " in history file", isTimeValid(time));
-
-    String time1 = values.get(Keys.FINISH_TIME);
-    assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
-               " in history file", isTimeValid(time1));
-    assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
-               areTimesInOrder(time, time1));
+    long submitTime = jobInfo.getSubmitTime();
+    long launchTime = jobInfo.getLaunchTime();
+    long finishTime = jobInfo.getFinishTime();
+    
+    assertTrue("Invalid submit time", submitTime > 0);
+    assertTrue("SubmitTime > LaunchTime", submitTime <= launchTime);
+    assertTrue("LaunchTime > FinishTime", launchTime <= finishTime);
+    
+    String stat = jobInfo.getJobStatus();
 
-    String stat = values.get(Keys.JOB_STATUS);
     assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" +
                " history file", (status.equals(stat)));
+    String priority = jobInfo.getPriority();
 
-    String priority = values.get(Keys.JOB_PRIORITY);
+    assertNotNull(priority);
     assertTrue("Unknown priority for the job in history file",
                (priority.equals("HIGH") ||
                 priority.equals("LOW")  || priority.equals("NORMAL") ||
@@ -296,34 +116,30 @@
   // Validate Format of Task Level Keys, Values read from history file
   private static void validateTaskLevelKeyValuesFormat(JobInfo job,
                                   boolean splitsCanBeEmpty) {
-    Map<String, JobHistory.Task> tasks = job.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = job.getAllTasks();
 
     // validate info of each task
-    for (JobHistory.Task task : tasks.values()) {
-
-      String tid = task.get(Keys.TASKID);
-      String time = task.get(Keys.START_TIME);
-      // We allow START_TIME=0 for tasks seen in history after JT restart
-      if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
-        assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
-                 time + " in history file", isTimeValid(time));
-      }
+    for (TaskInfo task : tasks.values()) {
 
-      String time1 = task.get(Keys.FINISH_TIME);
-      assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
-                 time1 + " in history file", isTimeValid(time1));
+      TaskID tid = task.getTaskId();
+      long startTime = task.getStartTime();
+      assertTrue("Invalid Start time", startTime > 0);
+      
+      long finishTime = task.getFinishTime();
       assertTrue("Task FINISH_TIME is < START_TIME in history file",
-                 areTimesInOrder(time, time1));
+                 startTime < finishTime);
 
       // Make sure that the Task type exists and it is valid
-      String type = task.get(Keys.TASK_TYPE);
+      TaskType type = task.getTaskType();
       assertTrue("Unknown Task type \"" + type + "\" is seen in " +
                  "history file for task " + tid,
-                 (type.equals("MAP") || type.equals("REDUCE") ||
-                  type.equals("SETUP") || type.equals("CLEANUP")));
+                 (type.equals(TaskType.MAP) || 
+                  type.equals(TaskType.REDUCE) ||
+                  type.equals(TaskType.JOB_CLEANUP) || 
+                  type.equals(TaskType.JOB_SETUP)));
 
-      if (type.equals("MAP")) {
-        String splits = task.get(Keys.SPLITS);
+      if (type.equals(TaskType.MAP)) {
+        String splits = task.getSplitLocations();
         //order in the condition OR check is important here
         if (!splitsCanBeEmpty || splits.length() != 0) {
           Matcher m = splitsPattern.matcher(splits);
@@ -333,103 +149,85 @@
       }
 
       // Validate task status
-      String status = task.get(Keys.TASK_STATUS);
+      String status = task.getTaskStatus();
       assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
-                 " history file for task " + tid, (status.equals("SUCCESS") ||
+                 " history file for task " + tid, (status.equals("SUCCEEDED") ||
                  status.equals("FAILED") || status.equals("KILLED")));
     }
   }
 
   // Validate foramt of Task Attempt Level Keys, Values read from history file
   private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
-    Map<String, JobHistory.Task> tasks = job.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = job.getAllTasks();
 
     // For each task
-    for (JobHistory.Task task : tasks.values()) {
+    for (TaskInfo task : tasks.values()) {
       // validate info of each attempt
-      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+      for (TaskAttemptInfo attempt : task.getAllTaskAttempts().values()) {
+
+        TaskAttemptID id = attempt.getAttemptId();
+        assertNotNull(id);
+        
+        long startTime = attempt.getStartTime();
+        assertTrue("Invalid Start time", startTime > 0);
 
-        String id = attempt.get(Keys.TASK_ATTEMPT_ID);
-        String time = attempt.get(Keys.START_TIME);
-        assertTrue("START_TIME of task attempt " + id +
-                   " is in unexpected format:" + time +
-                   " in history file", isTimeValid(time));
-
-        String time1 = attempt.get(Keys.FINISH_TIME);
-        assertTrue("FINISH_TIME of task attempt " + id +
-                   " is in unexpected format:" + time1 +
-                   " in history file", isTimeValid(time1));
+        long finishTime = attempt.getFinishTime();
         assertTrue("Task FINISH_TIME is < START_TIME in history file",
-                   areTimesInOrder(time, time1));
+            startTime < finishTime);
 
         // Make sure that the Task type exists and it is valid
-        String type = attempt.get(Keys.TASK_TYPE);
+        TaskType type = attempt.getTaskType();
         assertTrue("Unknown Task type \"" + type + "\" is seen in " +
                    "history file for task attempt " + id,
-                   (type.equals("MAP") || type.equals("REDUCE") ||
-                    type.equals("SETUP") || type.equals("CLEANUP")));
+                   (type.equals(TaskType.MAP) || type.equals(TaskType.REDUCE) ||
+                    type.equals(TaskType.JOB_CLEANUP) || 
+                    type.equals(TaskType.JOB_SETUP)));
 
         // Validate task status
-        String status = attempt.get(Keys.TASK_STATUS);
+        String status = attempt.getTaskStatus();
         assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
                    " history file for task attempt " + id,
-                   (status.equals("SUCCESS") || status.equals("FAILED") ||
-                    status.equals("KILLED")));
+                   (status.equals(TaskStatus.State.SUCCEEDED.toString()) ||
+                    status.equals(TaskStatus.State.FAILED.toString()) ||
+                    status.equals(TaskStatus.State.KILLED.toString())));
 
         // Successful Reduce Task Attempts should have valid SHUFFLE_FINISHED
         // time and SORT_FINISHED time
-        if (type.equals("REDUCE") && status.equals("SUCCESS")) {
-          time1 = attempt.get(Keys.SHUFFLE_FINISHED);
-          assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
-                     " is in unexpected format:" + time1 +
-                     " in history file", isTimeValid(time1));
-          assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
-                     "in history file", areTimesInOrder(time, time1));
-          time = attempt.get(Keys.SORT_FINISHED);
-          assertTrue("SORT_FINISHED of task attempt " + id +
-                     " is in unexpected format:" + time +
-                     " in history file", isTimeValid(time));
-          assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
-                     " in history file", areTimesInOrder(time1, time));
+        if (type.equals(TaskType.REDUCE) && 
+            status.equals(TaskStatus.State.SUCCEEDED.toString())) {
+          long shuffleFinishTime = attempt.getShuffleFinishTime();
+          assertTrue(startTime < shuffleFinishTime);
+          
+          long sortFinishTime = attempt.getSortFinishTime();
+          assertTrue(shuffleFinishTime < sortFinishTime);
         }
-        else if (type.equals("MAP") && status.equals("SUCCESS")) {
+        else if (type.equals(TaskType.MAP) && 
+            status.equals(TaskStatus.State.SUCCEEDED.toString())) {
           // Successful MAP Task Attempts should have valid MAP_FINISHED time
-          time1 = attempt.get(Keys.MAP_FINISHED);
-          assertTrue("MAP_FINISHED time of task attempt " + id +
-                     " is in unexpected format:" + time1 +
-                     " in history file", isTimeValid(time1));
-          assertTrue("MAP_FINISHED time of map task is < START_TIME " +
-                     "in history file", areTimesInOrder(time, time1));
+         long mapFinishTime = attempt.getMapFinishTime();
+         assertTrue(startTime < mapFinishTime);
         }
 
         // check if hostname is valid
-        String hostname = attempt.get(Keys.HOSTNAME);
+        String hostname = attempt.getHostname();
         Matcher m = hostNamePattern.matcher(hostname);
         assertTrue("Unexpected Host name of task attempt " + id, m.matches());
 
         // check if trackername is valid
-        String trackerName = attempt.get(Keys.TRACKER_NAME);
+        String trackerName = attempt.getTrackerName();
         m = trackerNamePattern.matcher(trackerName);
         assertTrue("Unexpected tracker name of task attempt " + id,
                    m.matches());
 
         if (!status.equals("KILLED")) {
           // check if http port is valid
-          String httpPort = attempt.get(Keys.HTTP_PORT);
-          m = digitsPattern.matcher(httpPort);
-          assertTrue("Unexpected http port of task attempt " + id, m.matches());
+          int httpPort = attempt.getHttpPort();
+          assertTrue(httpPort > 0);
         }
         
         // check if counters are parsable
-        String counters = attempt.get(Keys.COUNTERS);
-        try {
-          Counters readCounters = Counters.fromEscapedCompactString(counters);
-          assertTrue("Counters of task attempt " + id + " are not parsable",
-                     readCounters != null);
-        } catch (ParseException pe) {
-          LOG.warn("While trying to parse counters of task attempt " + id +
-                   ", " + pe);
-        }
+        Counters counters = attempt.getCounters();
+        assertNotNull(counters);
       }
     }
   }
@@ -443,9 +241,8 @@
     String parts[] = path.getName().split("_");
     //TODO this is a hack :(
     // jobtracker-hostname_jobtracker-identifier_
-    String id = parts[2] + "_" + parts[3] + "_" + parts[4];
-    String jobUniqueString = parts[0] + "_" + parts[1] + "_" +  id;
-    return new Path(dir, jobUniqueString + "_conf.xml");
+    String id = parts[0] + "_" + parts[1] + "_" + parts[2];
+    return new Path(dir, id + "_conf.xml");
   }
 
   /**
@@ -470,12 +267,13 @@
    * @param id job id
    * @param conf job conf
    */
-  static void validateJobHistoryFileFormat(JobID id, JobConf conf,
+  public static void validateJobHistoryFileFormat(JobHistory jobHistory,
+      JobID id, JobConf conf,
                  String status, boolean splitsCanBeEmpty) throws IOException  {
 
     // Get the history file name
-    Path dir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(conf, id, dir);
+    Path dir = jobHistory.getCompletedJobHistoryLocation();
+    String logFileName = getDoneFile(jobHistory, conf, id, dir);
 
     // Framework history log file location
     Path logFile = new Path(dir, logFileName);
@@ -484,20 +282,12 @@
     // Check if the history file exists
     assertTrue("History file does not exist", fileSys.exists(logFile));
 
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    TestListener l = new TestListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
-
+    JobHistoryParser parser = new JobHistoryParser(fileSys, 
+        logFile.toUri().getPath());
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
 
     // validate format of job level key, values
-    validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
+    validateJobLevelKeyValuesFormat(jobInfo, status);
 
     // validate format of task level key, values
     validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
@@ -507,9 +297,10 @@
 
     // check if all the TaskAttempts, Tasks started are finished for
     // successful jobs
-    if (status.equals("SUCCESS")) {
+    if (status.equals("SUCCEEDED")) {
       // Make sure that the lists in taskIDsToAttemptIDs are empty.
-      for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
+      for(Iterator<String> it = 
+        taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
         String taskid = it.next();
         assertTrue("There are some Tasks which are not finished in history " +
                    "file.", taskEnds.contains(taskid));
@@ -530,73 +321,68 @@
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
 
-    Map<Keys, String> values = jobInfo.getValues();
-
     assertTrue("SUBMIT_TIME of job obtained from history file did not " +
                "match the expected value", jip.getStartTime() ==
-               Long.parseLong(values.get(Keys.SUBMIT_TIME)));
+               jobInfo.getSubmitTime());
 
     assertTrue("LAUNCH_TIME of job obtained from history file did not " +
                "match the expected value", jip.getLaunchTime() ==
-               Long.parseLong(values.get(Keys.LAUNCH_TIME)));
+               jobInfo.getLaunchTime());
 
     assertTrue("FINISH_TIME of job obtained from history file did not " +
                "match the expected value", jip.getFinishTime() ==
-               Long.parseLong(values.get(Keys.FINISH_TIME)));
+               jobInfo.getFinishTime());
 
     assertTrue("Job Status of job obtained from history file did not " +
                "match the expected value",
-               values.get(Keys.JOB_STATUS).equals("SUCCESS"));
+               jobInfo.getJobStatus().equals("SUCCEEDED"));
 
     assertTrue("Job Priority of job obtained from history file did not " +
                "match the expected value", jip.getPriority().toString().equals(
-               values.get(Keys.JOB_PRIORITY)));
+               jobInfo.getPriority()));
 
     assertTrue("Job Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getJobName(conf).equals(
-               values.get(Keys.JOBNAME)));
+               "match the expected value", 
+               conf.getJobName().equals(
+               jobInfo.getJobname()));
 
     assertTrue("User Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getUserName(conf).equals(
-               values.get(Keys.USER)));
+               "match the expected value", 
+               conf.getUser().equals(
+               jobInfo.getUsername()));
 
     // Validate job counters
-    Counters c = jip.getCounters();
+    Counters c = new Counters(jip.getCounters());
+    Counters jiCounters = jobInfo.getCounters();
     assertTrue("Counters of job obtained from history file did not " +
                "match the expected value",
-               c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
+               c.equals(jiCounters));
 
     // Validate number of total maps, total reduces, finished maps,
     // finished reduces, failed maps, failed recudes
-    String totalMaps = values.get(Keys.TOTAL_MAPS);
     assertTrue("Unexpected number of total maps in history file",
-               Integer.parseInt(totalMaps) == jip.desiredMaps());
+               jobInfo.getTotalMaps() == jip.desiredMaps());
 
-    String totalReduces = values.get(Keys.TOTAL_REDUCES);
     assertTrue("Unexpected number of total reduces in history file",
-               Integer.parseInt(totalReduces) == jip.desiredReduces());
+               jobInfo.getTotalReduces() == jip.desiredReduces());
 
-    String finMaps = values.get(Keys.FINISHED_MAPS);
     assertTrue("Unexpected number of finished maps in history file",
-               Integer.parseInt(finMaps) == jip.finishedMaps());
+               jobInfo.getFinishedMaps() == jip.finishedMaps());
 
-    String finReduces = values.get(Keys.FINISHED_REDUCES);
     assertTrue("Unexpected number of finished reduces in history file",
-               Integer.parseInt(finReduces) == jip.finishedReduces());
+               jobInfo.getFinishedReduces() == jip.finishedReduces());
 
-    String failedMaps = values.get(Keys.FAILED_MAPS);
     assertTrue("Unexpected number of failed maps in history file",
-               Integer.parseInt(failedMaps) == jip.failedMapTasks);
+               jobInfo.getFailedMaps() == jip.failedMapTasks);
 
-    String failedReduces = values.get(Keys.FAILED_REDUCES);
     assertTrue("Unexpected number of failed reduces in history file",
-               Integer.parseInt(failedReduces) == jip.failedReduceTasks);
+               jobInfo.getFailedReduces() == jip.failedReduceTasks);
   }
 
   // Validate Task Level Keys, Values read from history file by
   // comparing them with the actual values from JT.
   private static void validateTaskLevelKeyValues(MiniMRCluster mr,
-                      RunningJob job, JobInfo jobInfo) throws IOException  {
+      RunningJob job, JobInfo jobInfo) throws IOException  {
 
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
@@ -606,7 +392,7 @@
     TaskID mapTaskId = new TaskID(job.getID(), TaskType.MAP, 0);
     TaskID reduceTaskId = new TaskID(job.getID(), TaskType.REDUCE, 0);
 
-    TaskInProgress cleanups[] = jip.getCleanupTasks();
+    TaskInProgress cleanups[] = jip.cleanup;
     TaskID cleanupTaskId;
     if (cleanups[0].isComplete()) {
       cleanupTaskId = cleanups[0].getTIPId();
@@ -615,7 +401,7 @@
       cleanupTaskId = cleanups[1].getTIPId();
     }
 
-    TaskInProgress setups[] = jip.getSetupTasks();
+    TaskInProgress setups[] = jip.setup;
     TaskID setupTaskId;
     if (setups[0].isComplete()) {
       setupTaskId = setups[0].getTIPId();
@@ -624,53 +410,45 @@
       setupTaskId = setups[1].getTIPId();
     }
 
-    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = jobInfo.getAllTasks();
 
-    // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
-    for (JobHistory.Task task : tasks.values()) {
+    // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)    
 
-      String tid = task.get(Keys.TASKID);
-      if (tid.equals(mapTaskId.toString()) ||
-          tid.equals(reduceTaskId.toString()) ||
-          tid.equals(cleanupTaskId.toString()) ||
-          tid.equals(setupTaskId.toString())) {
-
-        TaskID taskId = null;
-        if (tid.equals(mapTaskId.toString())) {
-          taskId = mapTaskId;
-        }
-        else if (tid.equals(reduceTaskId.toString())) {
-          taskId = reduceTaskId;
-        }
-        else if (tid.equals(cleanupTaskId.toString())) {
-          taskId = cleanupTaskId;
-        }
-        else if (tid.equals(setupTaskId.toString())) {
-          taskId = setupTaskId;
-        }
-        TaskInProgress tip = jip.getTaskInProgress(taskId);
+    for (TaskInfo task : tasks.values()) {
+      TaskID tid = task.getTaskId();
+
+      if (tid.equals(mapTaskId) ||
+          tid.equals(reduceTaskId) ||
+          tid.equals(cleanupTaskId) ||
+          tid.equals(setupTaskId)) {
+
+        TaskInProgress tip = jip.getTaskInProgress
+        (org.apache.hadoop.mapred.TaskID.downgrade(tid));
         assertTrue("START_TIME of Task " + tid + " obtained from history " +
-             "file did not match the expected value", tip.getExecStartTime() ==
-             Long.parseLong(task.get(Keys.START_TIME)));
+            "file did not match the expected value", 
+            tip.getExecStartTime() ==
+              task.getStartTime());
 
         assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
-             "file did not match the expected value", tip.getExecFinishTime() ==
-             Long.parseLong(task.get(Keys.FINISH_TIME)));
+            "file did not match the expected value",
+            tip.getExecFinishTime() ==
+              task.getFinishTime());
 
-        if (taskId == mapTaskId) {//check splits only for map task
+        if (tid == mapTaskId) {//check splits only for map task
           assertTrue("Splits of Task " + tid + " obtained from history file " +
-                     " did not match the expected value",
-                     tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
+              " did not match the expected value",
+              tip.getSplitNodes().equals(task.getSplitLocations()));
         }
 
         TaskAttemptID attemptId = tip.getSuccessfulTaskid();
-        TaskStatus ts = tip.getTaskStatus(attemptId);
+        TaskStatus ts = tip.getTaskStatus(
+            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attemptId));
 
         // Validate task counters
-        Counters c = ts.getCounters();
+        Counters c = new Counters(ts.getCounters());
         assertTrue("Counters of Task " + tid + " obtained from history file " +
-                   " did not match the expected value",
-                  c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
+            " did not match the expected value",
+            c.equals(task.getCounters()));
       }
     }
   }
@@ -683,78 +461,83 @@
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
 
-    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+    Map<TaskID, TaskInfo> tasks = jobInfo.getAllTasks();
 
     // For each task
-    for (JobHistory.Task task : tasks.values()) {
+    for (TaskInfo task : tasks.values()) {
       // validate info of each attempt
-      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+      for (TaskAttemptInfo attempt : task.getAllTaskAttempts().values()) {
 
-        String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
-        TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
+        TaskAttemptID attemptId = attempt.getAttemptId();
         TaskID tid = attemptId.getTaskID();
 
-        // Validate task id
-        assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-                   tid.toString().equals(attempt.get(Keys.TASKID)));
-
-        TaskInProgress tip = jip.getTaskInProgress(tid);
-        TaskStatus ts = tip.getTaskStatus(attemptId);
+        TaskInProgress tip = jip.getTaskInProgress
+        (org.apache.hadoop.mapred.TaskID.downgrade(tid));
+        
+        TaskStatus ts = tip.getTaskStatus(
+            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attemptId));
 
         // Validate task attempt start time
-        assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-            ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
+        assertTrue("START_TIME of Task attempt " + attemptId +
+            " obtained from " +
+            "history file did not match the expected value",
+            ts.getStartTime() == attempt.getStartTime());
 
         // Validate task attempt finish time
-        assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-            ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
+        assertTrue("FINISH_TIME of Task attempt " + attemptId +
+                   " obtained from " +
+                   "history file " + ts.getFinishTime() + 
+                   " did not match the expected value, " +
+                   attempt.getFinishTime(),
+            ts.getFinishTime() == attempt.getFinishTime());
 
 
-        TaskTrackerStatus ttStatus = jt.getTaskTrackerStatus(ts.getTaskTracker());
+        TaskTrackerStatus ttStatus =
+          jt.getTaskTrackerStatus(ts.getTaskTracker());
 
         if (ttStatus != null) {
-          assertTrue("http port of task attempt " + idStr + " obtained from " +
+          assertTrue("http port of task attempt " + attemptId +
+                     " obtained from " +
                      "history file did not match the expected value",
                      ttStatus.getHttpPort() ==
-                     Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
+                     attempt.getHttpPort());
 
-          if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
+          if (attempt.getTaskStatus().equals("SUCCEEDED")) {
             String ttHostname = jt.getNode(ttStatus.getHost()).toString();
 
             // check if hostname is valid
-            assertTrue("Host name of task attempt " + idStr + " obtained from" +
+            assertTrue("Host name of task attempt " + attemptId +
+                       " obtained from" +
                        " history file did not match the expected value",
-                       ttHostname.equals(attempt.get(Keys.HOSTNAME)));
+                       ttHostname.equals(attempt.getHostname()));
           }
         }
-        if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
+        if (attempt.getTaskStatus().equals("SUCCEEDED")) {
           // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
           // Reduce Task Attempts
-          if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
-            assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
+          if (attempt.getTaskType().equals("REDUCE")) {
+            assertTrue("SHUFFLE_FINISHED time of task attempt " + attemptId +
                      " obtained from history file did not match the expected" +
                      " value", ts.getShuffleFinishTime() ==
-                     Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
-            assertTrue("SORT_FINISHED time of task attempt " + idStr +
+                     attempt.getShuffleFinishTime());
+            assertTrue("SORT_FINISHED time of task attempt " + attemptId +
                      " obtained from history file did not match the expected" +
                      " value", ts.getSortFinishTime() ==
-                     Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
+                     attempt.getSortFinishTime());
           }
 
           //Validate task counters
-          Counters c = ts.getCounters();
-          assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
+          Counters c = new Counters(ts.getCounters());
+          assertTrue("Counters of Task Attempt " + attemptId + " obtained from " +
                      "history file did not match the expected value",
-               c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
+               c.equals(attempt.getCounters()));
         }
         
         // check if tracker name is valid
-        assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
+        assertTrue("Tracker name of task attempt " + attemptId +
+                   " obtained from " +
                    "history file did not match the expected value",
-                   ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
+                   ts.getTaskTracker().equals(attempt.getTrackerName()));
       }
     }
   }
@@ -766,13 +549,15 @@
    * @param job RunningJob object of the job whose history is to be validated
    * @param conf job conf
    */
-  static void validateJobHistoryFileContent(MiniMRCluster mr,
+  public static void validateJobHistoryFileContent(MiniMRCluster mr,
                               RunningJob job, JobConf conf) throws IOException  {
 
     JobID id = job.getID();
-    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+    JobHistory jobHistory = 
+      mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+    Path doneDir = jobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = getDoneFile(conf, id, doneDir);
+    String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
@@ -781,18 +566,10 @@
     // Check if the history file exists
     assertTrue("History file does not exist", fileSys.exists(logFile));
 
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    DefaultJobHistoryParser.JobTasksParseListener l =
-                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
-
+    JobHistoryParser parser = new JobHistoryParser(fileSys,
+        logFile.toUri().getPath());
+    
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
     // Now the history file contents are available in jobInfo. Let us compare
     // them with the actual values from JT.
     validateJobLevelKeyValues(mr, job, jobInfo, conf);
@@ -811,8 +588,8 @@
       //set the done folder location
       String doneFolder = "history_done";
       conf.set("mapred.job.tracker.history.completed.location", doneFolder);
-      
-      String logDir = 
+
+      String logDir =
         "file:///" + new File(System.getProperty("hadoop.log.dir")).
         getAbsolutePath() + File.separator + "history";
 
@@ -821,27 +598,21 @@
       //there may be some stale files, clean them
       if (logDirFs.exists(logDirPath)) {
         boolean deleted = logDirFs.delete(logDirPath, true);
-        LOG.info(logDirPath + " deleted " + deleted); 
+        LOG.info(logDirPath + " deleted " + deleted);
       }
-      
+
       logDirFs.mkdirs(logDirPath);
-      assertEquals("No of file in logDir not correct", 0, 
-          logDirFs.listStatus(logDirPath).length); 
+      assertEquals("No of file in logDir not correct", 0,
+          logDirFs.listStatus(logDirPath).length);
       logDirFs.create(new Path(logDirPath, "f1"));
       logDirFs.create(new Path(logDirPath, "f2"));
-      assertEquals("No of file in logDir not correct", 2, 
+      assertEquals("No of file in logDir not correct", 2,
           logDirFs.listStatus(logDirPath).length);
-
+      
       MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
-      assertEquals("Files in logDir did not move to DONE folder",
-          0, logDirFs.listStatus(logDirPath).length);
-      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-      assertEquals("Files in DONE dir not correct",
-          2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
-
       // run the TCs
       conf = mr.createJobConf();
 
@@ -862,10 +633,13 @@
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.getName());
       JobID id = job.getID();
-      String logFileName = getDoneFile(conf, id, doneDir);
+      String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
@@ -899,7 +673,8 @@
       assertFalse("Config for completed jobs not deleted from running folder", 
                   fileSys.exists(runningJobConfFilename));
 
-      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+      validateJobHistoryFileFormat(jobHistory,
+          job.getID(), conf, "SUCCEEDED", false);
       validateJobHistoryFileContent(mr, job, conf);
 
       // get the job conf filename
@@ -947,12 +722,13 @@
 
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      
-      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.toString());
       JobID id = job.getID();
-      String logFileName = getDoneFile(conf, id, doneDir);
+      String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
 
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
@@ -966,14 +742,13 @@
       assertTrue("Config for completed jobs doesnt exist", 
                  fileSys.exists(confFile));
 
-      // check if the file exists in a done folder
+      // check if the conf file exists in a done folder
       assertTrue("Completed job config doesnt exist in the done folder", 
                  doneDir.getName().equals(confFile.getParent().getName()));
 
       // check if the file exists in a done folder
       assertTrue("Completed jobs doesnt exist in the done folder", 
                  doneDir.getName().equals(logFile.getParent().getName()));
-      
 
       // check if the job file is removed from the history location 
       Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -986,11 +761,13 @@
       assertFalse("Config for completed jobs not deleted from running folder", 
                   fileSys.exists(runningJobConfFilename));
 
-      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+      validateJobHistoryFileFormat(jobHistory, job.getID(), conf, 
+          "SUCCEEDED", false);
       validateJobHistoryFileContent(mr, job, conf);
 
       // get the job conf filename
-      String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID());
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      String name = jt.getLocalJobFilePath(job.getID());
       File file = new File(name);
 
       // check if the file get deleted
@@ -1008,123 +785,16 @@
 
   //Returns the file in the done folder
   //Waits for sometime to get the file moved to done
-  private static String getDoneFile(JobConf conf, JobID id, 
+  private static String getDoneFile(JobHistory jobHistory, 
+      JobConf conf, JobID id, 
       Path doneDir) throws IOException {
     String name = null;
     for (int i = 0; name == null && i < 20; i++) {
-      name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+      name = jobHistory.getDoneJobHistoryFileName(conf, id);
       UtilsForTests.waitFor(1000);
     }
     return name;
   }
-  // Returns the output path where user history log file is written to with
-  // default configuration setting for hadoop.job.history.user.location
-  private static Path getLogLocationInOutputPath(String logFileName,
-                                                      JobConf conf) {
-    JobConf jobConf = new JobConf(true);//default JobConf
-    FileOutputFormat.setOutputPath(jobConf,
-                     FileOutputFormat.getOutputPath(conf));
-    return JobHistory.JobInfo.getJobHistoryLogLocationForUser(
-                                             logFileName, jobConf);
-  }
-
-  /**
-   * Checks if the user history file exists in the correct dir
-   * @param id job id
-   * @param conf job conf
-   */
-  static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
-          throws IOException  {
-    // Get the history file name
-    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(conf, id, doneDir);
-
-    // User history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
-                                                     logFileName, conf);
-    if(logFile == null) {
-      // get the output path where history file is written to when
-      // hadoop.job.history.user.location is not set
-      logFile = getLogLocationInOutputPath(logFileName, conf);
-    }
-    FileSystem fileSys = null;
-    fileSys = logFile.getFileSystem(conf);
-
-    // Check if the user history file exists in the correct dir
-    if (conf.get("hadoop.job.history.user.location") == null) {
-      assertTrue("User log file " + logFile + " does not exist",
-                 fileSys.exists(logFile));
-    }
-    else if ("none".equals(conf.get("hadoop.job.history.user.location"))) {
-      // history file should not exist in the output path
-      assertFalse("Unexpected. User log file exists in output dir when " +
-                 "hadoop.job.history.user.location is set to \"none\"",
-                 fileSys.exists(logFile));
-    }
-    else {
-      //hadoop.job.history.user.location is set to a specific location.
-      // User log file should exist in that location
-      assertTrue("User log file " + logFile + " does not exist",
-                 fileSys.exists(logFile));
-
-      // User log file should not exist in output path.
-
-      // get the output path where history file is written to when
-      // hadoop.job.history.user.location is not set
-      Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
-      
-      if (logFile != logFile1) {
-        fileSys = logFile1.getFileSystem(conf);
-        assertFalse("Unexpected. User log file exists in output dir when " +
-              "hadoop.job.history.user.location is set to a different location",
-              fileSys.exists(logFile1));
-      }
-    }
-  }
-
-  // Validate user history file location for the given values of
-  // hadoop.job.history.user.location as
-  // (1)null(default case), (2)"none", and (3)some user specified dir.
-  public void testJobHistoryUserLogLocation() throws IOException {
-    MiniMRCluster mr = null;
-    try {
-      mr = new MiniMRCluster(2, "file:///", 3);
-
-      // run the TCs
-      JobConf conf = mr.createJobConf();
-
-      FileSystem fs = FileSystem.get(conf);
-      // clean up
-      fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
-
-      Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
-      Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
-
-      // validate for the case of null(default)
-      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
-
-      inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
-      outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
-      // validate for the case of "none"
-      conf.set("hadoop.job.history.user.location", "none");
-      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
- 
-      inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
-      outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
-      // validate for the case of any dir
-      conf.set("hadoop.job.history.user.location", TEST_ROOT_DIR + "/succeed");
-      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
-
-    } finally {
-      if (mr != null) {
-        cleanupLocalFiles(mr);
-        mr.shutdown();
-      }
-    }
-  }
 
   private void cleanupLocalFiles(MiniMRCluster mr) 
   throws IOException {
@@ -1133,7 +803,9 @@
     Path sysDir = new Path(jt.getSystemDir());
     FileSystem fs = sysDir.getFileSystem(conf);
     fs.delete(sysDir, true);
-    Path jobHistoryDir = JobHistory.getJobHistoryLocation();
+    Path jobHistoryDir = 
+      mr.getJobTrackerRunner().getJobTracker().getJobHistory().
+      getJobHistoryLocation();
     fs = jobHistoryDir.getFileSystem(conf);
     fs.delete(jobHistoryDir, true);
   }
@@ -1143,13 +815,13 @@
    * @param id job id
    * @param conf job conf
    */
-  private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
-          String status) throws IOException  {
+  private static void validateJobHistoryJobStatus(JobHistory jobHistory,
+      JobID id, JobConf conf, String status) throws IOException  {
 
     // Get the history file name
-    Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(conf, id, doneDir);
-
+    Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+    String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+    
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
     FileSystem fileSys = logFile.getFileSystem(conf);
@@ -1162,19 +834,13 @@
     fileSys.getFileStatus(logFile).getPermission().equals(
        new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)));
     
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    DefaultJobHistoryParser.JobTasksParseListener l =
-                  new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
+    JobHistoryParser parser = new JobHistoryParser(fileSys, 
+        logFile.toUri().getPath());
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
+    
 
     assertTrue("Job Status read from job history file is not the expected" +
-         " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
+         " status", status.equals(jobInfo.getJobStatus()));
   }
 
   // run jobs that will be (1) succeeded (2) failed (3) killed
@@ -1197,21 +863,23 @@
       // Run a job that will be succeeded and validate its job status
       // existing in history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
-      long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
-      assertTrue(historyCleanerRanAt != 0);
+      
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      validateJobHistoryJobStatus(jobHistory, job.getID(), conf, 
+          JobStatus.getJobRunState(JobStatus.SUCCEEDED));
       
       // Run a job that will be failed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobFail(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
-      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
+      validateJobHistoryJobStatus(jobHistory, job.getID(), conf, 
+          JobStatus.getJobRunState(JobStatus.FAILED));
       
       // Run a job that will be killed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
-      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
+      validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+          JobStatus.getJobRunState(JobStatus.KILLED));
       
     } finally {
       if (mr != null) {
@@ -1220,4 +888,5 @@
       }
     }
   }
+
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Thu Sep 17 05:04:21 2009
@@ -24,6 +24,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 
 /**
  * Test if the job retire works fine. 
@@ -81,7 +82,7 @@
     assertNotNull("Job is not in cache", jobtracker.getJobStatus(id));
     
     // get the job conf filename
-    String name = JobHistory.JobInfo.getLocalJobFilePath(id);
+    String name = jobtracker.getLocalJobFilePath(id);
     File file = new File(name);
  
     assertFalse("JobConf file not deleted", file.exists());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Thu Sep 17 05:04:21 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
@@ -51,6 +52,7 @@
           jobConf.getJobName(), "", "");
       this.status.setJobPriority(JobPriority.NORMAL);
       this.status.setStartTime(startTime);
+      this.jobHistory = new FakeJobHistory();
     }
 
     @Override

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Sep 17 05:04:21 2009
@@ -26,6 +26,7 @@
 import junit.framework.TestSuite;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
@@ -85,6 +86,7 @@
       this.profile = new JobProfile(jc.getUser(), jobid, 
           jobFile.toString(), null, jc.getJobName(),
           jc.getQueueName());
+      this.jobHistory = new FakeJobHistory();
     }
 
     @Override

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Thu Sep 17 05:04:21 2009
@@ -278,9 +278,11 @@
     verifyOutput(outDir.getFileSystem(conf), outDir);
 
     //TestJobHistory
-    TestJobHistory.validateJobHistoryFileFormat(jobId, conf, "SUCCESS", false);
+    TestJobHistory.validateJobHistoryFileFormat(
+        mrCluster.getJobTrackerRunner().getJobTracker().getJobHistory(),
+        jobId, conf, "SUCCEEDED", false);
+    
     TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf);
-    TestJobHistory.validateJobHistoryUserLogLocation(job.getID(), conf);
 
     // Since we keep setKeepTaskFilesPattern, these files should still be
     // present and will not be cleaned up.

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Thu Sep 17 05:04:21 2009
@@ -226,7 +226,7 @@
   /**
    * A utility that waits for specified amount of time
    */
-  static void waitFor(long duration) {
+  public static void waitFor(long duration) {
     try {
       synchronized (waitLock) {
         waitLock.wait(duration);
@@ -582,7 +582,7 @@
   }
 
   // Run a job that will be succeeded and wait until it completes
-  static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
+  public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
          throws IOException {
     conf.setJobName("test-job-succeed");
     conf.setMapperClass(IdentityMapper.class);
@@ -601,7 +601,7 @@
   }
 
   // Run a job that will be failed and wait until it completes
-  static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
+  public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
          throws IOException {
     conf.setJobName("test-job-fail");
     conf.setMapperClass(FailMapper.class);
@@ -620,7 +620,7 @@
   }
 
   // Run a job that will be killed and wait until it completes
-  static RunningJob runJobKill(JobConf conf,  Path inDir, Path outDir)
+  public static RunningJob runJobKill(JobConf conf,  Path inDir, Path outDir)
          throws IOException {
 
     conf.setJobName("test-job-kill");

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java Thu Sep 17 05:04:21 2009
@@ -702,9 +702,9 @@
                         namenode+"/destdat"});
 
       System.out.println(execCmd(shell, "-lsr", logdir));
-      logs = fs.listStatus(new Path(namenode+"/logs"));
+      logs = fs.globStatus(new Path(namenode+"/logs/part*"));
       assertTrue("Unexpected map count, logs.length=" + logs.length,
-          logs.length == 2);
+          logs.length == 1);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Thu Sep 17 05:04:21 2009
@@ -35,8 +35,6 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.mapred.JobHistory;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -853,7 +851,7 @@
           if (finishTime != null) {
             jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
             if (status != null) {
-              jobBeingTraced.setOutcome(JobHistory.Values.valueOf(status));
+              jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values.valueOf(status));
             }
 
             maybeMateJobAndConf();
@@ -1025,11 +1023,11 @@
         task.setFinishTime(Long.parseLong(finishTime));
       }
 
-      JobHistory.Values typ;
-      JobHistory.Values stat;
+      Pre21JobHistoryConstants.Values typ;
+      Pre21JobHistoryConstants.Values stat;
 
       try {
-        stat = status == null ? null : JobHistory.Values.valueOf(status);
+        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.error("A task status you don't know about is \"" + status + "\".",
             e);
@@ -1039,7 +1037,7 @@
       task.setTaskStatus(stat);
 
       try {
-        typ = taskType == null ? null : JobHistory.Values.valueOf(taskType);
+        typ = taskType == null ? null : Pre21JobHistoryConstants.Values.valueOf(taskType);
       } catch (IllegalArgumentException e) {
         LOG.error("A task type you don't know about is \"" + taskType + "\".",
             e);
@@ -1048,8 +1046,8 @@
 
       task.setTaskType(typ);
 
-      List<LoggedTask> vec = typ == JobHistory.Values.MAP ? jobBeingTraced
-          .getMapTasks() : typ == JobHistory.Values.REDUCE ? jobBeingTraced
+      List<LoggedTask> vec = typ == Pre21JobHistoryConstants.Values.MAP ? jobBeingTraced
+          .getMapTasks() : typ == Pre21JobHistoryConstants.Values.REDUCE ? jobBeingTraced
           .getReduceTasks() : jobBeingTraced.getOtherTasks();
 
       if (!taskAlreadyLogged) {
@@ -1248,10 +1246,10 @@
         task.getAttempts().add(attempt);
       }
 
-      JobHistory.Values stat = null;
+      Pre21JobHistoryConstants.Values stat = null;
 
       try {
-        stat = status == null ? null : JobHistory.Values.valueOf(status);
+        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.error("A map attempt status you don't know about is \"" + status
             + "\".", e);
@@ -1301,11 +1299,11 @@
       if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
         long runtime = attempt.getFinishTime() - attempt.getStartTime();
 
-        if (stat == JobHistory.Values.SUCCESS) {
+        if (stat == Pre21JobHistoryConstants.Values.SUCCESS) {
           successfulMapAttemptTimes[distance].enter(runtime);
         }
 
-        if (stat == JobHistory.Values.FAILED) {
+        if (stat == Pre21JobHistoryConstants.Values.FAILED) {
           failedMapAttemptTimes[distance].enter(runtime);
         }
       }
@@ -1399,10 +1397,10 @@
         task.getAttempts().add(attempt);
       }
 
-      JobHistory.Values stat = null;
+      Pre21JobHistoryConstants.Values stat = null;
 
       try {
-        stat = status == null ? null : JobHistory.Values.valueOf(status);
+        stat = status == null ? null : Pre21JobHistoryConstants.Values.valueOf(status);
       } catch (IllegalArgumentException e) {
         LOG.warn("A map attempt status you don't know about is \"" + status
             + "\".", e);
@@ -1432,11 +1430,11 @@
       if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
         long runtime = attempt.getFinishTime() - attempt.getStartTime();
 
-        if (stat == JobHistory.Values.SUCCESS) {
+        if (stat == Pre21JobHistoryConstants.Values.SUCCESS) {
           successfulReduceAttemptTimes.enter(runtime);
         }
 
-        if (stat == JobHistory.Values.FAILED) {
+        if (stat == Pre21JobHistoryConstants.Values.FAILED) {
           failedReduceAttemptTimes.enter(runtime);
         }
       }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java Thu Sep 17 05:04:21 2009
@@ -18,10 +18,10 @@
 package org.apache.hadoop.tools.rumen;
 
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobHistory;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
 
 /**
  * {@link JobStory} represents the runtime information available for a
@@ -114,5 +114,5 @@
    * Get the outcome of the job execution.
    * @return The outcome of the job execution.
    */
-  public JobHistory.Values getOutcome();
+  public Values getOutcome();
 }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Thu Sep 17 05:04:21 2009
@@ -26,8 +26,6 @@
 
 import org.codehaus.jackson.annotate.JsonAnySetter;
 
-import org.apache.hadoop.mapred.JobHistory;
-
 /**
  * A {@link LoggedDiscreteCDF} is a representation of an hadoop job, with the
  * details of this class set up to meet the requirements of the Jackson JSON
@@ -61,7 +59,7 @@
   int heapMegabytes = -1;
   int totalMaps = -1;
   int totalReduces = -1;
-  JobHistory.Values outcome = JobHistory.Values.SUCCESS;
+  Pre21JobHistoryConstants.Values outcome = Pre21JobHistoryConstants.Values.SUCCESS;
   JobType jobtype = JobType.JAVA;
   JobPriority priority = JobPriority.NORMAL;
 
@@ -216,11 +214,11 @@
     this.totalReduces = totalReduces;
   }
 
-  public JobHistory.Values getOutcome() {
+  public Pre21JobHistoryConstants.Values getOutcome() {
     return outcome;
   }
 
-  void setOutcome(JobHistory.Values outcome) {
+  void setOutcome(Pre21JobHistoryConstants.Values outcome) {
     this.outcome = outcome;
   }
 
@@ -390,7 +388,7 @@
     }
   }
 
-  private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+  private void compare1(Pre21JobHistoryConstants.Values c1, Pre21JobHistoryConstants.Values c2,
       TreePath loc, String eltname) throws DeepInequalityException {
     if (c1 != c2) {
       throw new DeepInequalityException(eltname + " miscompared", new TreePath(

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Thu Sep 17 05:04:21 2009
@@ -20,8 +20,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.mapred.JobHistory;
-
 /**
  * A {@link LoggedTask} represents a [hadoop] task that is part of a hadoop job.
  * It knows about the [pssibly empty] sequence of attempts, its I/O footprint,
@@ -39,8 +37,8 @@
   String taskID;
   long startTime = -1L;
   long finishTime = -1L;
-  JobHistory.Values taskType;
-  JobHistory.Values taskStatus;
+  Pre21JobHistoryConstants.Values taskType;
+  Pre21JobHistoryConstants.Values taskStatus;
   List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
 
   ArrayList<LoggedLocation> preferredLocations = new ArrayList<LoggedLocation>();
@@ -140,19 +138,19 @@
     this.numberReduces = numberReduces;
   }
 
-  public JobHistory.Values getTaskStatus() {
+  public Pre21JobHistoryConstants.Values getTaskStatus() {
     return taskStatus;
   }
 
-  void setTaskStatus(JobHistory.Values taskStatus) {
+  void setTaskStatus(Pre21JobHistoryConstants.Values taskStatus) {
     this.taskStatus = taskStatus;
   }
 
-  public JobHistory.Values getTaskType() {
+  public Pre21JobHistoryConstants.Values getTaskType() {
     return taskType;
   }
 
-  void setTaskType(JobHistory.Values taskType) {
+  void setTaskType(Pre21JobHistoryConstants.Values taskType) {
     this.taskType = taskType;
   }
 
@@ -175,7 +173,7 @@
     }
   }
 
-  private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+  private void compare1(Pre21JobHistoryConstants.Values c1, Pre21JobHistoryConstants.Values c2,
       TreePath loc, String eltname) throws DeepInequalityException {
     if (c1 == null && c2 == null) {
       return;

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Thu Sep 17 05:04:21 2009
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.tools.rumen;
 
-import org.apache.hadoop.mapred.JobHistory;
-
 // HACK ALERT!!!  This "should" have have two subclasses, which might be called
 //                LoggedMapTaskAttempt and LoggedReduceTaskAttempt, but 
 //                the Jackson implementation of JSON doesn't handle a 
@@ -36,7 +34,7 @@
 public class LoggedTaskAttempt implements DeepCompare {
 
   String attemptID;
-  JobHistory.Values result;
+  Pre21JobHistoryConstants.Values result;
   long startTime = -1L;
   long finishTime = -1L;
   String hostName;
@@ -89,11 +87,11 @@
     this.attemptID = attemptID;
   }
 
-  public JobHistory.Values getResult() {
+  public Pre21JobHistoryConstants.Values getResult() {
     return result;
   }
 
-  void setResult(JobHistory.Values result) {
+  void setResult(Pre21JobHistoryConstants.Values result) {
     this.result = result;
   }
 
@@ -261,7 +259,7 @@
     }
   }
 
-  private void compare1(JobHistory.Values c1, JobHistory.Values c2,
+  private void compare1(Pre21JobHistoryConstants.Values c1, Pre21JobHistoryConstants.Values c2,
       TreePath loc, String eltname) throws DeepInequalityException {
     if (c1 != c2) {
       throw new DeepInequalityException(eltname + " miscompared", new TreePath(

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java?rev=816052&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java Thu Sep 17 05:04:21 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * 
+ *
+ */
+public class Pre21JobHistoryConstants {
+  
+  /**
+   * Job history files contain key="value" pairs, where keys belong to this enum. 
+   * It acts as a global namespace for all keys. 
+   */
+  static enum Keys {
+    JOBTRACKERID,
+    START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
+    LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
+    FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
+    ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
+    SHUFFLE_FINISHED, SORT_FINISHED, MAP_FINISHED, COUNTERS, SPLITS,
+    JOB_PRIORITY, HTTP_PORT, TRACKER_NAME, STATE_STRING, VERSION
+  }
+
+  /**
+   * This enum contains some of the values commonly used by history log events. 
+   * since values in history can only be strings - Values.name() is used in 
+   * most places in history file. 
+   */
+  public static enum Values {
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
+  }
+ 
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=816052&r1=816051&r2=816052&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Thu Sep 17 05:04:21 2009
@@ -26,9 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobHistory;
 import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
@@ -36,6 +34,7 @@
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
 
 /**
  * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
@@ -97,12 +96,12 @@
     this(job, cluster, System.nanoTime());
   }
 
-  private static State convertState(JobHistory.Values status) {
-    if (status == JobHistory.Values.SUCCESS) {
+  private static State convertState(Values status) {
+    if (status == Values.SUCCESS) {
       return State.SUCCEEDED;
-    } else if (status == JobHistory.Values.FAILED) {
+    } else if (status == Values.FAILED) {
       return State.FAILED;
-    } else if (status == JobHistory.Values.KILLED) {
+    } else if (status == Values.KILLED) {
       return State.KILLED;
     } else {
       throw new IllegalArgumentException("unknown status " + status);
@@ -226,7 +225,7 @@
   }
 
   @Override
-  public JobHistory.Values getOutcome() {
+  public Values getOutcome() {
     return job.getOutcome();
   }
 
@@ -378,7 +377,7 @@
           taskNumber, locality);
     } else {
       // TODO should we handle killed attempts later?
-      if (loggedAttempt.getResult()==JobHistory.Values.KILLED) {
+      if (loggedAttempt.getResult()== Values.KILLED) {
         TaskInfo taskInfo = getTaskInfo(loggedTask);
         return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
             taskNumber, locality);
@@ -576,8 +575,8 @@
     long outputRecords = -1;
     long heapMegabytes = -1;
 
-    JobHistory.Values type = loggedTask.getTaskType();
-    if ((type != JobHistory.Values.MAP) && (type != JobHistory.Values.REDUCE)) {
+    Values type = loggedTask.getTaskType();
+    if ((type != Values.MAP) && (type != Values.REDUCE)) {
       throw new IllegalArgumentException(
           "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString());
     }
@@ -586,11 +585,11 @@
       attempt = sanitizeLoggedTaskAttempt(attempt);
       // ignore bad attempts or unsuccessful attempts.
       if ((attempt == null)
-          || (attempt.getResult() != JobHistory.Values.SUCCESS)) {
+          || (attempt.getResult() != Values.SUCCESS)) {
         continue;
       }
 
-      if (type == JobHistory.Values.MAP) {
+      if (type == Values.MAP) {
         inputBytes = attempt.getHdfsBytesRead();
         inputRecords = attempt.getMapInputRecords();
         outputBytes = attempt.getMapOutputBytes();



Mime
View raw message