hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r747318 - /hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java
Date Tue, 24 Feb 2009 09:45:30 GMT
Author: ddas
Date: Tue Feb 24 09:45:30 2009
New Revision: 747318

URL: http://svn.apache.org/viewvc?rev=747318&view=rev
Log:
HADOOP-4191. Replacing the TestJobHistory.java file. I had committed a wrong version of the
file in my earlier commit.

Modified:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=747318&r1=747317&r2=747318&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java Tue Feb 24 09:45:30
2009
@@ -988,993 +988,3 @@
     }
   }
 }
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Iterator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobHistory.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Tests the JobHistory files - to catch any changes to JobHistory that can
- * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
- *
- * testJobHistoryFile
- * Run a job that will be succeeded and validate its history file format and
- * content.
- *
- * testJobHistoryUserLogLocation
- * Run jobs with the given values of hadoop.job.history.user.location as
- *   (1)null(default case), (2)"none", and (3)some dir like "/tmp".
- *   Validate user history file location in each case.
- *
- * testJobHistoryJobStatus
- * Run jobs that will be (1) succeeded (2) failed (3) killed.
- *   Validate job status read from history file in each case.
- *
- * Future changes to job history are to be reflected here in this file.
- */
-public class TestJobHistory extends TestCase {
-   private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
- 
-  private static String TEST_ROOT_DIR = new File(System.getProperty(
-      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
-
-  private static final Pattern digitsPattern =
-                                     Pattern.compile(JobHistory.DIGITS);
-
-  // hostname like   /default-rack/host1.foo.com OR host1.foo.com
-  private static final Pattern hostNamePattern = Pattern.compile(
-                                       "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)");
-
-  private static final String IP_ADDR =
-                       "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?";
-
-  // hostname like   /default-rack/host1.foo.com OR host1.foo.com
-  private static final Pattern trackerNamePattern = Pattern.compile(
-                         "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
-                         IP_ADDR + ":" + JobHistory.DIGITS);
-
-  private static final Pattern splitsPattern = Pattern.compile(
-                              hostNamePattern + "(," + hostNamePattern + ")*");
-
-  private static Map<String, List<String>> taskIDsToAttemptIDs =
-                                     new HashMap<String, List<String>>();
-
-  //Each Task End seen from history file is added here
-  private static List<String> taskEnds = new ArrayList<String>();
-
-  // List of tasks that appear in history file after JT reatart. This is to
-  // allow START_TIME=0 for these tasks.
-  private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
-
-  // List of potential tasks whose start time can be 0 because of JT restart
-  private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
-
-  /**
-   * Listener for history log file, it populates JobHistory.JobInfo
-   * object with data from log file and validates the data.
-   */
-  static class TestListener
-                    extends DefaultJobHistoryParser.JobTasksParseListener {
-    int lineNum;//line number of history log file
-    boolean isJobLaunched;
-    boolean isJTRestarted;
-
-    TestListener(JobInfo job) {
-      super(job);
-      lineNum = 0;
-      isJobLaunched = false;
-      isJTRestarted = false;
-    }
-
-    // TestListener implementation
-    public void handle(RecordTypes recType, Map<Keys, String> values)
-    throws IOException {
-
-      lineNum++;
-
-      // Check if the record is of type Meta
-      if (recType == JobHistory.RecordTypes.Meta) {
-        long version = Long.parseLong(values.get(Keys.VERSION));
-        assertTrue("Unexpected job history version ",
-                   (version >= 0 && version <= JobHistory.VERSION));
-      }
-      else if (recType.equals(RecordTypes.Job)) {
-        String jobid = values.get(Keys.JOBID);
-        assertTrue("record type 'Job' is seen without JOBID key" +
-        		" in history file at line " + lineNum, jobid != null);
-        JobID id = JobID.forName(jobid);
-        assertTrue("JobID in history file is in unexpected format " +
-                  "at line " + lineNum, id != null);
-        String time = values.get(Keys.LAUNCH_TIME);
-        if (time != null) {
-          if (values.get(Keys.RESTART_COUNT) == null) {// restart is fine
-            assertFalse ("Duplicate Job LAUNCH_TIME is seen in history " +
-                         "file at line " + lineNum, isJobLaunched);
-          }
-          else {// JT restart
-            isJTRestarted = true;
-          }
-          isJobLaunched = true;
-        }
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          assertTrue ("Job FINISH_TIME is seen in history file at line " +
-                      lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
-        }
-      }
-      else if (recType.equals(RecordTypes.Task)) {
-        String taskid = values.get(Keys.TASKID);
-        assertTrue("record type 'Task' is seen without TASKID key" +
-        		" in history file at line " + lineNum, taskid != null);
-        TaskID id = TaskID.forName(taskid);
-        assertTrue("TaskID in history file is in unexpected format " +
-                  "at line " + lineNum, id != null);
-        
-        String time = values.get(Keys.START_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue("Duplicate START_TIME seen for task " + taskid +
-                     " in history file at line " + lineNum, attemptIDs == null);
-          attemptIDs = new ArrayList<String>();
-          taskIDsToAttemptIDs.put(taskid, attemptIDs);
-
-          if (isJTRestarted) {
-            // This maintains a potential ignoreStartTimeTasks list
-            tempIgnoreStartTimeOfTasks.add(taskid);
-          }
-        }
-
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          String s = values.get(Keys.TASK_STATUS);
-          if (s != null) {
-            List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-            assertTrue ("Task FINISH_TIME is seen in history file at line " +
-                    lineNum + " before START_TIME is seen", attemptIDs != null);
-
-            // Check if all the attemptIDs of this task are finished
-            assertTrue("TaskId " + taskid + " is finished at line " +
-                       lineNum + " but its attemptID is not finished.",
-                       (attemptIDs.size() <= 1));
-
-            // Check if at least 1 attempt of this task is seen
-            assertTrue("TaskId " + taskid + " is finished at line " +
-                       lineNum + " but no attemptID is seen before this.",
-                       attemptIDs.size() == 1);
-
-            if (s.equals("KILLED") || s.equals("FAILED")) {
-              // Task End with KILLED/FAILED status in history file is
-              // considered as TaskEnd, TaskStart. This is useful in checking
-              // the order of history lines.
-              attemptIDs = new ArrayList<String>();
-              taskIDsToAttemptIDs.put(taskid, attemptIDs);
-            }
-            else {
-              taskEnds.add(taskid);
-            }
-          }
-          else {
-            // This line of history file could be just an update to finish time
-          }
-        }
-      }
-      else if (recType.equals(RecordTypes.MapAttempt) ||
-                 recType.equals(RecordTypes.ReduceAttempt)) {
-        String taskid =  values.get(Keys.TASKID);
-        assertTrue("record type " + recType + " is seen without TASKID key" +
-        		" in history file at line " + lineNum, taskid != null);
-        
-        String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-        TaskAttemptID id = TaskAttemptID.forName(attemptId);
-        assertTrue("AttemptID in history file is in unexpected format " +
-                   "at line " + lineNum, id != null);
-        
-        String time = values.get(Keys.START_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
-                      " before Task is seen", attemptIDs != null);
-          assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
-                      "file at line " + lineNum, attemptIDs.remove(attemptId));
-
-          if (attemptIDs.isEmpty()) {
-            //just a boolean whether any attempt is seen or not
-            attemptIDs.add("firstAttemptIsSeen");
-          }
-          attemptIDs.add(attemptId);
-
-          if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
-              (id.getId() < 1000)) {
-            // If Task line of this attempt is seen in history file after
-            // JT restart and if this attempt is < 1000(i.e. attempt is noti
-            // started after JT restart) - assuming single JT restart happened
-            ignoreStartTimeOfTasks.add(taskid);
-          }
-        }
-
-        time = values.get(Keys.FINISH_TIME);
-        if (time != null) {
-          List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
-                      + lineNum + " before Task is seen", attemptIDs != null);
-
-          assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
-                      + lineNum + " before TaskAttempt START_TIME is seen",
-                      attemptIDs.remove(attemptId));
-        }
-      }
-      super.handle(recType, values);
-    }
-  }
-
-  // Check if the time is in the expected format
-  private static boolean isTimeValid(String time) {
-    Matcher m = digitsPattern.matcher(time);
-    return m.matches() && (Long.parseLong(time) > 0);
-  }
-
-  private static boolean areTimesInOrder(String time1, String time2) {
-    return (Long.parseLong(time1) <= Long.parseLong(time2));
-  }
-
-  // Validate Format of Job Level Keys, Values read from history file
-  private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
-                                                      String status) {
-    String time = values.get(Keys.SUBMIT_TIME);
-    assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
-               " in history file", isTimeValid(time));
-
-    time = values.get(Keys.LAUNCH_TIME);
-    assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
-               " in history file", isTimeValid(time));
-
-    String time1 = values.get(Keys.FINISH_TIME);
-    assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
-               " in history file", isTimeValid(time1));
-    assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
-               areTimesInOrder(time, time1));
-
-    String stat = values.get(Keys.JOB_STATUS);
-    assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" +
-               " history file", (status.equals(stat)));
-
-    String priority = values.get(Keys.JOB_PRIORITY);
-    assertTrue("Unknown priority for the job in history file",
-               (priority.equals("HIGH") ||
-                priority.equals("LOW")  || priority.equals("NORMAL") ||
-                priority.equals("VERY_HIGH") || priority.equals("VERY_LOW")));
-  }
-
-  // Validate Format of Task Level Keys, Values read from history file
-  private static void validateTaskLevelKeyValuesFormat(JobInfo job,
-                                  boolean splitsCanBeEmpty) {
-    Map<String, JobHistory.Task> tasks = job.getAllTasks();
-
-    // validate info of each task
-    for (JobHistory.Task task : tasks.values()) {
-
-      String tid = task.get(Keys.TASKID);
-      String time = task.get(Keys.START_TIME);
-      // We allow START_TIME=0 for tasks seen in history after JT restart
-      if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
-        assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
-                 time + " in history file", isTimeValid(time));
-      }
-
-      String time1 = task.get(Keys.FINISH_TIME);
-      assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
-                 time1 + " in history file", isTimeValid(time1));
-      assertTrue("Task FINISH_TIME is < START_TIME in history file",
-                 areTimesInOrder(time, time1));
-
-      // Make sure that the Task type exists and it is valid
-      String type = task.get(Keys.TASK_TYPE);
-      assertTrue("Unknown Task type \"" + type + "\" is seen in " +
-                 "history file for task " + tid,
-                 (type.equals("MAP") || type.equals("REDUCE") ||
-                  type.equals("SETUP") || type.equals("CLEANUP")));
-
-      if (type.equals("MAP")) {
-        String splits = task.get(Keys.SPLITS);
-        //order in the condition OR check is important here
-        if (!splitsCanBeEmpty || splits.length() != 0) {
-          Matcher m = splitsPattern.matcher(splits);
-          assertTrue("Unexpected format of SPLITS \"" + splits + "\" is seen" +
-                     " in history file for task " + tid, m.matches());
-        }
-      }
-
-      // Validate task status
-      String status = task.get(Keys.TASK_STATUS);
-      assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
-                 " history file for task " + tid, (status.equals("SUCCESS") ||
-                 status.equals("FAILED") || status.equals("KILLED")));
-    }
-  }
-
-  // Validate foramt of Task Attempt Level Keys, Values read from history file
-  private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
-    Map<String, JobHistory.Task> tasks = job.getAllTasks();
-
-    // For each task
-    for (JobHistory.Task task : tasks.values()) {
-      // validate info of each attempt
-      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
-
-        String id = attempt.get(Keys.TASK_ATTEMPT_ID);
-        String time = attempt.get(Keys.START_TIME);
-        assertTrue("START_TIME of task attempt " + id +
-                   " is in unexpected format:" + time +
-                   " in history file", isTimeValid(time));
-
-        String time1 = attempt.get(Keys.FINISH_TIME);
-        assertTrue("FINISH_TIME of task attempt " + id +
-                   " is in unexpected format:" + time1 +
-                   " in history file", isTimeValid(time1));
-        assertTrue("Task FINISH_TIME is < START_TIME in history file",
-                   areTimesInOrder(time, time1));
-
-        // Make sure that the Task type exists and it is valid
-        String type = attempt.get(Keys.TASK_TYPE);
-        assertTrue("Unknown Task type \"" + type + "\" is seen in " +
-                   "history file for task attempt " + id,
-                   (type.equals("MAP") || type.equals("REDUCE") ||
-                    type.equals("SETUP") || type.equals("CLEANUP")));
-
-        // Validate task status
-        String status = attempt.get(Keys.TASK_STATUS);
-        assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
-                   " history file for task attempt " + id,
-                   (status.equals("SUCCESS") || status.equals("FAILED") ||
-                    status.equals("KILLED")));
-
-        // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
-        // SORT_FINISHED time
-        if (type.equals("REDUCE") && status.equals("SUCCESS")) {
-          time1 = attempt.get(Keys.SHUFFLE_FINISHED);
-          assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
-                     " is in unexpected format:" + time1 +
-                     " in history file", isTimeValid(time1));
-          assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
-                     "in history file", areTimesInOrder(time, time1));
-          time = attempt.get(Keys.SORT_FINISHED);
-          assertTrue("SORT_FINISHED of task attempt " + id +
-                     " is in unexpected format:" + time +
-                     " in history file", isTimeValid(time));
-          assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
-                     " in history file", areTimesInOrder(time1, time));
-        }
-
-        // check if hostname is valid
-        String hostname = attempt.get(Keys.HOSTNAME);
-        Matcher m = hostNamePattern.matcher(hostname);
-        assertTrue("Unexpected Host name of task attempt " + id, m.matches());
-
-        // check if trackername is valid
-        String trackerName = attempt.get(Keys.TRACKER_NAME);
-        m = trackerNamePattern.matcher(trackerName);
-        assertTrue("Unexpected tracker name of task attempt " + id,
-                   m.matches());
-
-        if (!status.equals("KILLED")) {
-          // check if http port is valid
-          String httpPort = attempt.get(Keys.HTTP_PORT);
-          m = digitsPattern.matcher(httpPort);
-          assertTrue("Unexpected http port of task attempt " + id, m.matches());
-        }
-        
-        // check if counters are parsable
-        String counters = attempt.get(Keys.COUNTERS);
-        try {
-          Counters readCounters = Counters.fromEscapedCompactString(counters);
-          assertTrue("Counters of task attempt " + id + " are not parsable",
-                     readCounters != null);
-        } catch (ParseException pe) {
-          LOG.warn("While trying to parse counters of task attempt " + id +
-                   ", " + pe);
-        }
-      }
-    }
-  }
-
-  /**
-   *  Validates the format of contents of history file
-   *  (1) history file exists and in correct location
-   *  (2) Verify if the history file is parsable
-   *  (3) Validate the contents of history file
-   *     (a) Format of all TIMEs are checked against a regex
-   *     (b) validate legality/format of job level key, values
-   *     (c) validate legality/format of task level key, values
-   *     (d) validate legality/format of attempt level key, values
-   *     (e) check if all the TaskAttempts, Tasks started are finished.
-   *         Check finish of each TaskAttemptID against its start to make sure
-   *         that all TaskAttempts, Tasks started are indeed finished and the
-   *         history log lines are in the proper order.
-   *         We want to catch ordering of history lines like
-   *            Task START
-   *            Attempt START
-   *            Task FINISH
-   *            Attempt FINISH
-   *         (speculative execution is turned off for this).
-   * @param id job id
-   * @param conf job conf
-   */
-  static void validateJobHistoryFileFormat(JobID id, JobConf conf,
-                 String status, boolean splitsCanBeEmpty) throws IOException  {
-
-    // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-
-    // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-    FileSystem fileSys = logFile.getFileSystem(conf);
- 
-    // Check if the history file exists
-    assertTrue("History file does not exist", fileSys.exists(logFile));
-
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    TestListener l = new TestListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
-
-
-    // validate format of job level key, values
-    validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
-
-    // validate format of task level key, values
-    validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
-
-    // validate format of attempt level key, values
-    validateTaskAttemptLevelKeyValuesFormat(jobInfo);
-
-    // check if all the TaskAttempts, Tasks started are finished for
-    // successful jobs
-    if (status.equals("SUCCESS")) {
-      // Make sure that the lists in taskIDsToAttemptIDs are empty.
-      for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();)
{
-        String taskid = it.next();
-        assertTrue("There are some Tasks which are not finished in history " +
-                   "file.", taskEnds.contains(taskid));
-        List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
-        if(attemptIDs != null) {
-          assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" +
-                     " that are not finished.", (attemptIDs.size() == 1));
-        }
-      }
-    }
-  }
-
-  // Validate Job Level Keys, Values read from history file by
-  // comparing them with the actual values from JT.
-  private static void validateJobLevelKeyValues(MiniMRCluster mr,
-          RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException  {
-
-    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jt.getJob(job.getID());
-
-    Map<Keys, String> values = jobInfo.getValues();
-
-    assertTrue("SUBMIT_TIME of job obtained from history file did not " +
-               "match the expected value", jip.getStartTime() ==
-               Long.parseLong(values.get(Keys.SUBMIT_TIME)));
-
-    assertTrue("LAUNCH_TIME of job obtained from history file did not " +
-               "match the expected value", jip.getLaunchTime() ==
-               Long.parseLong(values.get(Keys.LAUNCH_TIME)));
-
-    assertTrue("FINISH_TIME of job obtained from history file did not " +
-               "match the expected value", jip.getFinishTime() ==
-               Long.parseLong(values.get(Keys.FINISH_TIME)));
-
-    assertTrue("Job Status of job obtained from history file did not " +
-               "match the expected value",
-               values.get(Keys.JOB_STATUS).equals("SUCCESS"));
-
-    assertTrue("Job Priority of job obtained from history file did not " +
-               "match the expected value", jip.getPriority().toString().equals(
-               values.get(Keys.JOB_PRIORITY)));
-
-    assertTrue("Job Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getJobName(conf).equals(
-               values.get(Keys.JOBNAME)));
-
-    assertTrue("User Name of job obtained from history file did not " +
-               "match the expected value", JobInfo.getUserName(conf).equals(
-               values.get(Keys.USER)));
-
-    // Validate job counters
-    Counters c = jip.getCounters();
-    assertTrue("Counters of job obtained from history file did not " +
-               "match the expected value",
-               c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
-
-    // Validate number of total maps, total reduces, finished maps,
-    // finished reduces, failed maps, failed recudes
-    String totalMaps = values.get(Keys.TOTAL_MAPS);
-    assertTrue("Unexpected number of total maps in history file",
-               Integer.parseInt(totalMaps) == jip.desiredMaps());
-
-    String totalReduces = values.get(Keys.TOTAL_REDUCES);
-    assertTrue("Unexpected number of total reduces in history file",
-               Integer.parseInt(totalReduces) == jip.desiredReduces());
-
-    String finMaps = values.get(Keys.FINISHED_MAPS);
-    assertTrue("Unexpected number of finished maps in history file",
-               Integer.parseInt(finMaps) == jip.finishedMaps());
-
-    String finReduces = values.get(Keys.FINISHED_REDUCES);
-    assertTrue("Unexpected number of finished reduces in history file",
-               Integer.parseInt(finReduces) == jip.finishedReduces());
-
-    String failedMaps = values.get(Keys.FAILED_MAPS);
-    assertTrue("Unexpected number of failed maps in history file",
-               Integer.parseInt(failedMaps) == jip.failedMapTasks);
-
-    String failedReduces = values.get(Keys.FAILED_REDUCES);
-    assertTrue("Unexpected number of failed reduces in history file",
-               Integer.parseInt(failedReduces) == jip.failedReduceTasks);
-  }
-
-  // Validate Task Level Keys, Values read from history file by
-  // comparing them with the actual values from JT.
-  private static void validateTaskLevelKeyValues(MiniMRCluster mr,
-                      RunningJob job, JobInfo jobInfo) throws IOException  {
-
-    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jt.getJob(job.getID());
-
-    // Get the 1st map, 1st reduce, cleanup & setup taskIDs and
-    // validate their history info
-    TaskID mapTaskId = new TaskID(job.getID(), true, 0);
-    TaskID reduceTaskId = new TaskID(job.getID(), false, 0);
-
-    TaskInProgress cleanups[] = jip.getCleanupTasks();
-    TaskID cleanupTaskId;
-    if (cleanups[0].isComplete()) {
-      cleanupTaskId = cleanups[0].getTIPId();
-    }
-    else {
-      cleanupTaskId = cleanups[1].getTIPId();
-    }
-
-    TaskInProgress setups[] = jip.getSetupTasks();
-    TaskID setupTaskId;
-    if (setups[0].isComplete()) {
-      setupTaskId = setups[0].getTIPId();
-    }
-    else {
-      setupTaskId = setups[1].getTIPId();
-    }
-
-    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
-
-    // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
-    for (JobHistory.Task task : tasks.values()) {
-
-      String tid = task.get(Keys.TASKID);
-      if (tid.equals(mapTaskId.toString()) ||
-          tid.equals(reduceTaskId.toString()) ||
-          tid.equals(cleanupTaskId.toString()) ||
-          tid.equals(setupTaskId.toString())) {
-
-        TaskID taskId = null;
-        if (tid.equals(mapTaskId.toString())) {
-          taskId = mapTaskId;
-        }
-        else if (tid.equals(reduceTaskId.toString())) {
-          taskId = reduceTaskId;
-        }
-        else if (tid.equals(cleanupTaskId.toString())) {
-          taskId = cleanupTaskId;
-        }
-        else if (tid.equals(setupTaskId.toString())) {
-          taskId = setupTaskId;
-        }
-        TaskInProgress tip = jip.getTaskInProgress(taskId);
-        assertTrue("START_TIME of Task " + tid + " obtained from history " +
-             "file did not match the expected value", tip.getExecStartTime() ==
-             Long.parseLong(task.get(Keys.START_TIME)));
-
-        assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
-             "file did not match the expected value", tip.getExecFinishTime() ==
-             Long.parseLong(task.get(Keys.FINISH_TIME)));
-
-        if (taskId == mapTaskId) {//check splits only for map task
-          assertTrue("Splits of Task " + tid + " obtained from history file " +
-                     " did not match the expected value",
-                     tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
-        }
-
-        TaskAttemptID attemptId = tip.getSuccessfulTaskid();
-        TaskStatus ts = tip.getTaskStatus(attemptId);
-
-        // Validate task counters
-        Counters c = ts.getCounters();
-        assertTrue("Counters of Task " + tid + " obtained from history file " +
-                   " did not match the expected value",
-                  c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
-      }
-    }
-  }
-
-  // Validate Task Attempt Level Keys, Values read from history file by
-  // comparing them with the actual values from JT.
-  private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
-                      RunningJob job, JobInfo jobInfo) throws IOException  {
-
-    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jt.getJob(job.getID());
-
-    Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
-
-    // For each task
-    for (JobHistory.Task task : tasks.values()) {
-      // validate info of each attempt
-      for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
-
-        String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
-        TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
-        TaskID tid = attemptId.getTaskID();
-
-        // Validate task id
-        assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-                   tid.toString().equals(attempt.get(Keys.TASKID)));
-
-        TaskInProgress tip = jip.getTaskInProgress(tid);
-        TaskStatus ts = tip.getTaskStatus(attemptId);
-
-        // Validate task attempt start time
-        assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-            ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
-
-        // Validate task attempt finish time
-        assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-            ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
-
-
-        TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker());
-
-        if (ttStatus != null) {
-          assertTrue("http port of task attempt " + idStr + " obtained from " +
-                     "history file did not match the expected value",
-                     ttStatus.getHttpPort() ==
-                     Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
-
-          if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
-            String ttHostname = jt.getNode(ttStatus.getHost()).toString();
-
-            // check if hostname is valid
-            assertTrue("Host name of task attempt " + idStr + " obtained from" +
-                       " history file did not match the expected value",
-                       ttHostname.equals(attempt.get(Keys.HOSTNAME)));
-          }
-        }
-        if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
-          // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
-          // Reduce Task Attempts
-          if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
-            assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
-                     " obtained from history file did not match the expected" +
-                     " value", ts.getShuffleFinishTime() ==
-                     Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
-            assertTrue("SORT_FINISHED time of task attempt " + idStr +
-                     " obtained from history file did not match the expected" +
-                     " value", ts.getSortFinishTime() ==
-                     Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
-          }
-
-          //Validate task counters
-          Counters c = ts.getCounters();
-          assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
-                     "history file did not match the expected value",
-               c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
-        }
-        
-        // check if tracker name is valid
-        assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
-                   "history file did not match the expected value",
-                   ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
-      }
-    }
-  }
-
-  /**
-   * Checks if the history file content is as expected comparing with the
-   * actual values obtained from JT.
-   * Job Level, Task Level and Task Attempt Level Keys, Values are validated.
-   * @param job RunningJob object of the job whose history is to be validated
-   * @param conf job conf
-   */
-  static void validateJobHistoryFileContent(MiniMRCluster mr,
-                              RunningJob job, JobConf conf) throws IOException  {
-
-    JobID id = job.getID();
-    // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-
-    // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-    FileSystem fileSys = logFile.getFileSystem(conf);
- 
-    // Check if the history file exists
-    assertTrue("History file does not exist", fileSys.exists(logFile));
-
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    DefaultJobHistoryParser.JobTasksParseListener l =
-                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
-
-    // Now the history file contents are available in jobInfo. Let us compare
-    // them with the actual values from JT.
-    validateJobLevelKeyValues(mr, job, jobInfo, conf);
-    validateTaskLevelKeyValues(mr, job, jobInfo);
-    validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
-  }
-
-  /** Run a job that will be succeeded and validate its history file format
-   *  and its content.
-   */
-  public void testJobHistoryFile() throws IOException {
-    MiniMRCluster mr = null;
-    try {
-      mr = new MiniMRCluster(2, "file:///", 3);
-
-      // run the TCs
-      JobConf conf = mr.createJobConf();
-
-      FileSystem fs = FileSystem.get(conf);
-      // clean up
-      fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
-
-      Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input");
-      Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output");
-
-      //Disable speculative execution
-      conf.setSpeculativeExecution(false);
-
-      // Make sure that the job is not removed from memory until we do finish
-      // the validation of history file content
-      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
-
-      // Run a job that will be succeeded and validate its history file
-      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
-      validateJobHistoryFileContent(mr, job, conf);
-
-    } finally {
-      if (mr != null) {
-        mr.shutdown();
-      }
-    }
-  }
-
-  // Returns the output path where user history log file is written to with
-  // default configuration setting for hadoop.job.history.user.location
-  private static Path getLogLocationInOutputPath(String logFileName,
-                                                      JobConf conf) {
-    JobConf jobConf = new JobConf(true);//default JobConf
-    FileOutputFormat.setOutputPath(jobConf,
-                     FileOutputFormat.getOutputPath(conf));
-    return JobHistory.JobInfo.getJobHistoryLogLocationForUser(
-                                             logFileName, jobConf);
-  }
-
-  /**
-   * Checks if the user history file exists in the correct dir
-   * @param id job id
-   * @param conf job conf
-   */
-  private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
-          throws IOException  {
-    // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-
-    // User history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
-                                                     logFileName, conf);
-    if(logFile == null) {
-      // get the output path where history file is written to when
-      // hadoop.job.history.user.location is not set
-      logFile = getLogLocationInOutputPath(logFileName, conf);
-    }
-    FileSystem fileSys = null;
-    fileSys = logFile.getFileSystem(conf);
-
-    // Check if the user history file exists in the correct dir
-    if (conf.get("hadoop.job.history.user.location") == null) {
-      assertTrue("User log file " + logFile + " does not exist",
-                 fileSys.exists(logFile));
-    }
-    else if (conf.get("hadoop.job.history.user.location") == "none") {
-      // history file should not exist in the output path
-      assertFalse("Unexpected. User log file exists in output dir when " +
-                 "hadoop.job.history.user.location is set to \"none\"",
-                 fileSys.exists(logFile));
-    }
-    else {
-      //hadoop.job.history.user.location is set to a specific location.
-      // User log file should exist in that location
-      assertTrue("User log file " + logFile + " does not exist",
-                 fileSys.exists(logFile));
-
-      // User log file should not exist in output path.
-
-      // get the output path where history file is written to when
-      // hadoop.job.history.user.location is not set
-      Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
-      
-      if (logFile != logFile1) {
-        fileSys = logFile1.getFileSystem(conf);
-        assertFalse("Unexpected. User log file exists in output dir when " +
-              "hadoop.job.history.user.location is set to a different location",
-              fileSys.exists(logFile1));
-      }
-    }
-  }
-
-  // Validate user history file location for the given values of
-  // hadoop.job.history.user.location as
-  // (1)null(default case), (2)"none", and (3)some dir "/tmp"
-  public void testJobHistoryUserLogLocation() throws IOException {
-    MiniMRCluster mr = null;
-    try {
-      mr = new MiniMRCluster(2, "file:///", 3);
-
-      // run the TCs
-      JobConf conf = mr.createJobConf();
-
-      FileSystem fs = FileSystem.get(conf);
-      // clean up
-      fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
-
-      Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
-      Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
-
-      // validate for the case of null(default)
-      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
-
-      inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
-      outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
-      // validate for the case of "none"
-      conf.set("hadoop.job.history.user.location", "none");
-      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
- 
-      inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
-      outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
-      // validate for the case of any dir
-      conf.set("hadoop.job.history.user.location", "/tmp");
-      job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryUserLogLocation(job.getID(), conf);
-
-    } finally {
-      if (mr != null) {
-        mr.shutdown();
-      }
-    }
-  }
-
-  /**
-   * Checks if the history file has expected job status
-   * @param id job id
-   * @param conf job conf
-   */
-  private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
-          String status) throws IOException  {
-
-    // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
-
-    // Framework history log file location
-    Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-    FileSystem fileSys = logFile.getFileSystem(conf);
- 
-    // Check if the history file exists
-    assertTrue("History file does not exist", fileSys.exists(logFile));
-
-
-    // check if the history file is parsable
-    String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
-    		                                   logFileName).split("_");
-
-    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
-    JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
-    DefaultJobHistoryParser.JobTasksParseListener l =
-                  new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
-
-    assertTrue("Job Status read from job history file is not the expected" +
-         " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
-  }
-
-  // run jobs that will be (1) succeeded (2) failed (3) killed
-  // and validate job status read from history file in each case
-  public void testJobHistoryJobStatus() throws IOException {
-    MiniMRCluster mr = null;
-    try {
-      mr = new MiniMRCluster(2, "file:///", 3);
-
-      // run the TCs
-      JobConf conf = mr.createJobConf();
-
-      FileSystem fs = FileSystem.get(conf);
-      // clean up
-      fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true);
-
-      Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input");
-      Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output");
-
-      // Run a job that will be succeeded and validate its job status
-      // existing in history file
-      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
-      
-      // Run a job that will be failed and validate its job status
-      // existing in history file
-      job = UtilsForTests.runJobFail(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
-      
-      // Run a job that will be killed and validate its job status
-      // existing in history file
-      job = UtilsForTests.runJobKill(conf, inDir, outDir);
-      validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
-      
-    } finally {
-      if (mr != null) {
-        mr.shutdown();
-      }
-    }
-  }
-}



Mime
View raw message