hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r695823 [1/3] - in /hadoop/core/trunk: ./ conf/ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/
Date Tue, 16 Sep 2008 12:05:19 GMT
Author: ddas
Date: Tue Sep 16 05:05:18 2008
New Revision: 695823

URL: http://svn.apache.org/viewvc?rev=695823&view=rev
Log:
HADOOP-3245. Adds the feature for supporting JobTracker restart. Running jobs can be recovered from the history file. The history file format has been modified to support recovery. The task attempt ID now has the JobTracker start time to disinguish attempts of the same TIP across restarts. Contributed by Amar Ramesh Kamat.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/webapps/job/jobhistory.jsp
    hadoop/core/trunk/src/webapps/job/jobtracker.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 16 05:05:18 2008
@@ -56,6 +56,12 @@
     output retention during the shuffle and reduce relative to maximum heap
     usage. (cdouglas)
 
+    HADOOP-3245. Adds the feature for supporting JobTracker restart. Running
+    jobs can be recovered from the history file. The history file format has
+    been modified to support recovery. The task attempt ID now has the 
+    JobTracker start time to disinguish attempts of the same TIP across 
+    restarts. (Amar Ramesh Kamat via ddas)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Tue Sep 16 05:05:18 2008
@@ -750,6 +750,32 @@
 </property>
 
 <property>
+  <name>mapred.jobtracker.restart.recover</name>
+  <value>false</value>
+  <description>"true" to enable (job) recovery upon restart,
+               "false" to start afresh
+  </description>
+</property>
+
+<property>
+  <name>mapred.jobtracker.job.history.block.size</name>
+  <value>0</value>
+  <description>The block size of the job history file. Since the job recovery
+               uses job history, its important to dump job history to disk as 
+               soon as possible.
+  </description>
+</property>
+
+<property>
+  <name>mapred.jobtracker.job.history.buffer.size</name>
+  <value>4096</value>
+  <description>The buffer size for the job history file. Since the job 
+               recovery uses job history, its important to frequently flush the 
+               job history to disk. This will minimize the loss in recovery.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.taskScheduler</name>
   <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
   <description>The class responsible for scheduling the tasks.</description>

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/StringUtils.java Tue Sep 16 05:05:18 2008
@@ -226,9 +226,19 @@
    * @param startTime start time
    */
   public static String formatTimeDiff(long finishTime, long startTime){
-    StringBuffer buf = new StringBuffer();
-    
     long timeDiff = finishTime - startTime; 
+    return formatTime(timeDiff); 
+  }
+  
+  /**
+   * 
+   * Given the time in long milliseconds, returns a 
+   * String in the format Xhrs, Ymins, Z sec. 
+   * 
+   * @param timeDiff The time difference to format
+   */
+  public static String formatTime(long timeDiff){
+    StringBuffer buf = new StringBuffer();
     long hours = timeDiff / (60*60*1000);
     long rem = (timeDiff % (60*60*1000));
     long minutes =  rem / (60*1000);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Tue Sep 16 05:05:18 2008
@@ -30,7 +30,6 @@
 import java.util.Map;
 import java.util.MissingResourceException;
 import java.util.ResourceBundle;
-import java.util.Set;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.io.IntWritable;
@@ -159,6 +158,13 @@
       return buf.toString();
     }
     
+    // Checks for (content) equality of two (basic) counters
+    synchronized boolean contentEquals(Counter c) {
+      return name.equals(c.getName())
+             && displayName.equals(c.getDisplayName())
+             && value == c.getCounter();
+    }
+    
     /**
      * What is the current value of this counter?
      * @return the current value
@@ -263,11 +269,26 @@
       return buf.toString();
     }
         
-    /**
-     * Returns the names of the counters within
+    /** 
+     * Checks for (content) equality of Groups
      */
-    public synchronized Set<String> getCounterNames() {
-      return subcounters.keySet();
+    synchronized boolean contentEquals(Group g) {
+      boolean isEqual = false;
+      if (g != null) {
+        if (size() == g.size()) {
+          isEqual = true;
+          for (Map.Entry<String, Counter> entry : subcounters.entrySet()) {
+            String key = entry.getKey();
+            Counter c1 = entry.getValue();
+            Counter c2 = g.getCounterForName(key);
+            if (!c1.contentEquals(c2)) {
+              isEqual = false;
+              break;
+            }
+          }
+        }
+      }
+      return isEqual;
     }
     
     /**
@@ -708,6 +729,25 @@
                                       charsToEscape);
   }
   
+  synchronized boolean contentEquals(Counters counters) {
+    boolean isEqual = false;
+    if (counters != null) {
+      if (size() == counters.size()) {
+        isEqual = true;
+        for (Map.Entry<String, Group> entry : this.counters.entrySet()) {
+          String key = entry.getKey();
+          Group sourceGroup = entry.getValue();
+          Group targetGroup = counters.getGroup(key);
+          if (!sourceGroup.contentEquals(targetGroup)) {
+            isEqual = false;
+            break;
+          }
+        }
+      }
+    }
+    return isEqual;
+  }
+  
   public static class Application {
     //special counters which are written by the application and are 
     //used by the framework.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java Tue Sep 16 05:05:18 2008
@@ -21,6 +21,8 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +39,7 @@
   short responseId;
   int heartbeatInterval;
   TaskTrackerAction[] actions;
+  Map<JobID, Integer> lastKnownIndexMap = null;
 
   HeartbeatResponse() {}
   
@@ -54,6 +57,14 @@
     return responseId;
   }
   
+  public void setLastKnownIndices(Map<JobID, Integer> lastKnownIndexMap) {
+    this.lastKnownIndexMap = lastKnownIndexMap; 
+  }
+  
+  public Map<JobID, Integer> getLastKnownIndex() {
+    return lastKnownIndexMap;
+  }
+  
   public void setActions(TaskTrackerAction[] actions) {
     this.actions = actions;
   }
@@ -90,6 +101,16 @@
         action.write(out);
       }
     }
+    // Write the last map event index for the jobs
+    if (lastKnownIndexMap != null) {
+      out.writeInt(lastKnownIndexMap.size());
+      for (Map.Entry<JobID, Integer> entry : lastKnownIndexMap.entrySet()) {
+        entry.getKey().write(out);
+        out.writeInt(entry.getValue());
+      }
+    } else {
+      out.writeInt(0);
+    }
     //ObjectWritable.writeObject(out, actions, actions.getClass(), conf);
   }
   
@@ -108,6 +129,16 @@
     } else {
       actions = null;
     }
+    // Read the last map events index of the jobs
+    int size = in.readInt();
+    if (size != 0) {
+      lastKnownIndexMap = new HashMap<JobID, Integer>(size);
+      for (int i = 0; i < size; ++i) {
+        JobID id = JobID.read(in);
+        int count = in.readInt();
+        lastKnownIndexMap.put(id, count);
+      }
+    }
     //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf);
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Tue Sep 16 05:05:18 2008
@@ -96,9 +96,10 @@
       LOG.info("Task " + taskid + " has problem " + trace);
     }
     
-    public TaskCompletionEvent[] getMapCompletionEvents(JobID jobId, 
-                                                        int fromEventId, int maxLocs) throws IOException {
-      return TaskCompletionEvent.EMPTY_ARRAY;
+    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
+        int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
+      return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY, 
+                                               false);
     }
 
     public void reportNextRecordRange(TaskAttemptID taskid, 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Sep 16 05:05:18 2008
@@ -47,6 +47,7 @@
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -813,6 +814,33 @@
     }
   }
 
+  /** 
+   * Checks if the job directory is clean and has all the required components 
+   * for (re) starting the job
+   */
+  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 
+  throws IOException {
+    FileStatus[] contents = fs.listStatus(jobDirPath);
+    int matchCount = 0;
+    if (contents != null && contents.length >=3) {
+      for (FileStatus status : contents) {
+        if ("job.xml".equals(status.getPath().getName())) {
+          ++matchCount;
+        }
+        if ("job.jar".equals(status.getPath().getName())) {
+          ++matchCount;
+        }
+        if ("job.split".equals(status.getPath().getName())) {
+          ++matchCount;
+        }
+      }
+      if (matchCount == 3) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   static class RawSplit implements Writable {
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Tue Sep 16 05:05:18 2008
@@ -39,8 +39,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -64,6 +67,7 @@
   
   public static final Log LOG = LogFactory.getLog(JobHistory.class);
   private static final String DELIMITER = " ";
+  private static final String LINE_DELIMITER = ".";
   private static final String KEY = "(\\w+)";
   private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
   
@@ -75,6 +79,10 @@
   private static Map<String, ArrayList<PrintWriter>> openJobs = 
                      new HashMap<String, ArrayList<PrintWriter>>();
   private static boolean disableHistory = false; 
+  private static final String SECONDARY_FILE_SUFFIX = ".recover";
+  private static long jobHistoryBlockSize = 0;
+  private static int jobHistoryBufferSize = 0;
+  private static String jobtrackerHostname;
   /**
    * Record types are identifiers for each line of log in history files. 
    * A record type appears as the first token in a single line of log. 
@@ -93,7 +101,8 @@
     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
-    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS
+    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
+    TRACKER_NAME, STATE_STRING
   }
 
   /**
@@ -102,7 +111,7 @@
    * most places in history file. 
    */
   public static enum Values {
-    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING
   }
 
   // temp buffer for parsed dataa
@@ -125,6 +134,7 @@
         + File.separator + "history");
       JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
                                     String.valueOf(jobTrackerStartTime) + "_";
+      jobtrackerHostname = hostname;
       Path logDir = new Path(LOG_DIR);
       FileSystem fs = logDir.getFileSystem(conf);
       if (!fs.exists(logDir)){
@@ -134,6 +144,14 @@
       }
       conf.set("hadoop.job.history.location", LOG_DIR);
       disableHistory = false;
+      // set the job history block size
+      jobHistoryBlockSize = 
+        conf.getLong("mapred.jobtracker.job.history.block.size", 
+                     fs.getDefaultBlockSize());
+      // set the job history buffer size
+      jobHistoryBufferSize = 
+        Integer.parseInt(conf.get("mapred.jobtracker.job.history.buffer.size", 
+                                  "4096"));
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -159,7 +177,7 @@
       StringBuffer buf = new StringBuffer(); 
       while ((line = reader.readLine())!= null){
         buf.append(line); 
-        if (!line.trim().endsWith("\"")){
+        if (!line.trim().endsWith("\"" + DELIMITER + LINE_DELIMITER)){
           continue; 
         }
         parseLine(buf.toString(), l);
@@ -206,7 +224,8 @@
   
   static void log(PrintWriter out, RecordTypes recordType, Keys key, 
                   String value){
-    out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""); 
+    out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""
+                + DELIMITER + LINE_DELIMITER); 
   }
   
   /**
@@ -228,6 +247,7 @@
       buf.append("\"");
       buf.append(DELIMITER); 
     }
+    buf.append(LINE_DELIMITER);
     
     for (PrintWriter out : writers) {
       out.println(buf.toString());
@@ -420,6 +440,237 @@
     }
     
     /**
+     * Get the job name from the job conf
+     */
+    static String getJobName(JobConf jobConf) {
+      String jobName = jobConf.getJobName();
+      if (jobName == null || jobName.length() == 0) {
+        jobName = "NA";
+      }
+      return jobName;
+    }
+    
+    /**
+     * Get the user name from the job conf
+     */
+    public static String getUserName(JobConf jobConf) {
+      String user = jobConf.getUser();
+      if (user == null || user.length() == 0) {
+        user = "NA";
+      }
+      return user;
+    }
+    
+    /**
+     * Get the job history file path given the history filename
+     */
+    public static Path getJobHistoryLogLocation(String logFileName)
+    {
+      return LOG_DIR == null ? null : new Path(LOG_DIR, logFileName);
+    }
+
+    /**
+     * Get the user job history file path
+     */
+    public static Path getJobHistoryLogLocationForUser(String logFileName, 
+                                                       JobConf jobConf) {
+      // find user log directory 
+      Path userLogFile = null;
+      Path outputPath = FileOutputFormat.getOutputPath(jobConf);
+      String userLogDir = jobConf.get("hadoop.job.history.user.location",
+                                      outputPath == null 
+                                      ? null 
+                                      : outputPath.toString());
+      if ("none".equals(userLogDir)) {
+        userLogDir = null;
+      }
+      if (userLogDir != null) {
+        userLogDir = userLogDir + Path.SEPARATOR + "_logs" + Path.SEPARATOR 
+                     + "history";
+        userLogFile = new Path(userLogDir, logFileName);
+      }
+      return userLogFile;
+    }
+
+    /**
+     * Generates the job history filename for a new job
+     */
+    private static String getNewJobHistoryFileName(JobConf jobConf, JobID id) {
+      return JOBTRACKER_UNIQUE_STRING
+             + id.toString() + "_" + getUserName(jobConf) + "_" 
+             + trimJobName(getJobName(jobConf));
+    }
+    
+    /**
+     * Trims the job-name if required
+     */
+    private static String trimJobName(String jobName) {
+      if (jobName.length() > JOB_NAME_TRIM_LENGTH) {
+        jobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH);
+      }
+      return jobName;
+    }
+    
+    /**
+     * Recover the job history filename from the history folder. 
+     * Uses the following pattern
+     *    $jt-hostname_[0-9]*_$job-id_$user-$job-name*
+     * @param jobConf the job conf
+     * @param id job id
+     */
+    public static synchronized String getJobHistoryFileName(JobConf jobConf, 
+                                                            JobID id) 
+    throws IOException {
+      String user = getUserName(jobConf);
+      String jobName = trimJobName(getJobName(jobConf));
+      
+      FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf);
+      if (LOG_DIR == null) {
+        return null;
+      }
+      
+      // Make the pattern matching the job's history file
+      final Pattern historyFilePattern = 
+        Pattern.compile(jobtrackerHostname + "_" + "[0-9]+" + "_" 
+                        + id.toString() + "_" + user + "_" + jobName + "+");
+      // a path filter that matches 4 parts of the filenames namely
+      //  - jt-hostname
+      //  - job-id
+      //  - username
+      //  - jobname
+      PathFilter filter = new PathFilter() {
+        public boolean accept(Path path) {
+          String fileName = path.getName();
+          try {
+            fileName = decodeJobHistoryFileName(fileName);
+          } catch (IOException ioe) {
+            LOG.info("Error while decoding history file " + fileName + "."
+                     + " Ignoring file.", ioe);
+            return false;
+          }
+          return historyFilePattern.matcher(fileName).find();
+        }
+      };
+      
+      FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
+      String filename;
+      if (statuses.length == 0) {
+        filename = 
+          encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, id));
+      } else {
+        // return filename considering that fact the name can be a 
+        // secondary filename like filename.recover
+        filename = decodeJobHistoryFileName(statuses[0].getPath().getName());
+        // Remove the '.recover' suffix if it exists
+        if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
+          int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
+          filename = filename.substring(0, newLength);
+        }
+        filename = encodeJobHistoryFileName(filename);
+      }
+      return filename;
+    }
+    
+    /** Since there was a restart, there should be a master file and 
+     * a recovery file. Once the recovery is complete, the master should be 
+     * deleted as an indication that the recovery file should be treated as the 
+     * master upon completion or next restart.
+     * @param fileName the history filename that needs checkpointing
+     * @param conf Job conf
+     * @throws IOException
+     */
+    static synchronized void checkpointRecovery(String fileName, JobConf conf) 
+    throws IOException {
+      Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
+      if (logPath != null) {
+        FileSystem fs = logPath.getFileSystem(conf);
+        fs.delete(logPath, false);
+      }
+      // do the same for the user file too
+      logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 
+                                                                   conf);
+      if (logPath != null) {
+        FileSystem fs = logPath.getFileSystem(conf);
+        fs.delete(logPath, false);
+      }
+    }
+    
+    static String getSecondaryJobHistoryFile(String filename) 
+    throws IOException {
+      return encodeJobHistoryFileName(
+          decodeJobHistoryFileName(filename) + SECONDARY_FILE_SUFFIX);
+    }
+    
+    /** Selects one of the two files generated as a part of recovery. 
+     * The thumb rule is that always select the oldest file. 
+     * This call makes sure that only one file is left in the end. 
+     * @param conf job conf
+     * @param logFilePath Path of the log file
+     * @throws IOException 
+     */
+    public synchronized static Path recoverJobHistoryFile(JobConf conf, 
+                                                          Path logFilePath) 
+    throws IOException {
+      FileSystem fs = logFilePath.getFileSystem(conf);
+      String tmpFilename = getSecondaryJobHistoryFile(logFilePath.getName());
+      Path logDir = logFilePath.getParent();
+      Path tmpFilePath = new Path(logDir, tmpFilename);
+      if (fs.exists(logFilePath)) {
+        if (fs.exists(tmpFilePath)) {
+          fs.delete(tmpFilePath, false);
+        }
+        return tmpFilePath;
+      } else {
+        if (fs.exists(tmpFilePath)) {
+          fs.rename(tmpFilePath, logFilePath);
+          return tmpFilePath;
+        } else {
+          return logFilePath;
+        }
+      }
+    }
+
+    /** Finalize the recovery and make one file in the end. 
+     * This invloves renaming the recover file to the master file.
+     * @param id Job id  
+     * @param conf the job conf
+     * @throws IOException
+     */
+    static synchronized void finalizeRecovery(JobID id, JobConf conf) 
+    throws IOException {
+      String masterLogFileName = 
+        JobHistory.JobInfo.getJobHistoryFileName(conf, id);
+      Path masterLogPath = 
+        JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
+      String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
+      Path tmpLogPath = 
+        JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
+      if (masterLogPath != null) {
+        FileSystem fs = masterLogPath.getFileSystem(conf);
+
+        // rename the tmp file to the master file. Note that this should be 
+        // done only when the file is closed and handles are released.
+        if(fs.exists(tmpLogPath)) {
+          fs.rename(tmpLogPath, masterLogPath);
+        }
+      }
+      
+      // do the same for the user file too
+      masterLogPath = 
+        JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
+                                                           conf);
+      tmpLogPath = 
+        JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, 
+                                                           conf);
+      if (masterLogPath != null) {
+        FileSystem fs = masterLogPath.getFileSystem(conf);
+        if (fs.exists(tmpLogPath)) {
+          fs.rename(tmpLogPath, masterLogPath);
+        }
+      }
+    }
+
+    /**
      * Log job submitted event to history. Creates a new file in history 
      * for the job. if history file creation fails, it disables history 
      * for all other events. 
@@ -438,46 +689,21 @@
 
       if (!disableHistory){
         // Get the username and job name to be used in the actual log filename;
-        // sanity check them too
-        String jobName = jobConf.getJobName();
-        if (jobName == null || jobName.length() == 0) {
-          jobName = "NA";
-        }
+        // sanity check them too        
+        String jobName = getJobName(jobConf);
 
-        String user = jobConf.getUser();
-        if (user == null || user.length() == 0) {
-          user = "NA";
-        }
+        String user = getUserName(jobConf);
         
-        // setup the history log file for this job
-        String trimmedJobName = jobName;
-        if (jobName.length() > JOB_NAME_TRIM_LENGTH) {
-          trimmedJobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH);
-        }
+        // get the history filename
         String logFileName = 
-            encodeJobHistoryFileName(jobUniqueString +  "_" + user + "_" + 
-                                     trimmedJobName);
+          getJobHistoryFileName(jobConf, jobId);
 
-        // find user log directory 
-        Path outputPath = FileOutputFormat.getOutputPath(jobConf);
-        userLogDir = jobConf.get("hadoop.job.history.user.location",
-        		outputPath == null ? null : outputPath.toString());
-        if ("none".equals(userLogDir)) {
-          userLogDir = null;
-        }
-        if (userLogDir != null) {
-          userLogDir = userLogDir + Path.SEPARATOR + "_logs" + 
-                       Path.SEPARATOR + "history";
-        }
-
-        Path logFile = null;
-        Path userLogFile = null;
-        if (LOG_DIR != null ) {
-          logFile = new Path(LOG_DIR, logFileName);
-        }
-        if (userLogDir != null ) {
-          userLogFile = new Path(userLogDir, logFileName);
-        }
+        // setup the history log file for this job
+        Path logFile = getJobHistoryLogLocation(logFileName);
+        
+        // find user log directory
+        Path userLogFile = 
+          getJobHistoryLogLocationForUser(logFileName, jobConf);
 
         try{
           ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
@@ -487,14 +713,24 @@
           if (LOG_DIR != null) {
             // create output stream for logging in hadoop.job.history.location
             fs = new Path(LOG_DIR).getFileSystem(jobConf);
-            out = fs.create(logFile, true, 4096);
+            
+            logFile = recoverJobHistoryFile(jobConf, logFile);
+            
+            out = fs.create(logFile, FsPermission.getDefault(), true, 
+                            jobHistoryBufferSize, 
+                            fs.getDefaultReplication(), 
+                            jobHistoryBlockSize, null);
             writer = new PrintWriter(out);
             writers.add(writer);
           }
-          if (userLogDir != null) {
+          if (userLogFile != null) {
+            userLogDir = userLogFile.getParent().toString();
             // create output stream for logging 
             // in hadoop.job.history.user.location
-            fs = new Path(userLogDir).getFileSystem(jobConf);
+            fs = userLogFile.getFileSystem(jobConf);
+ 
+            userLogFile = recoverJobHistoryFile(jobConf, userLogFile);
+            
             out = fs.create(userLogFile, true, 4096);
             writer = new PrintWriter(out);
             writers.add(writer);
@@ -595,8 +831,12 @@
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
-                         new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
-                         new String[] {jobId.toString(),  String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)}); 
+              new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, 
+                          Keys.TOTAL_REDUCES, Keys.JOB_STATUS},
+              new String[] {jobId.toString(), String.valueOf(startTime), 
+                            String.valueOf(totalMaps), 
+                            String.valueOf(totalReduces), 
+                            Values.RUNNING.name()}); 
         }
       }
     }
@@ -632,7 +872,7 @@
                                        String.valueOf(finishedReduces),
                                        String.valueOf(failedMaps), 
                                        String.valueOf(failedReduces),
-                                       counters.makeCompactString()});
+                                       counters.makeEscapedCompactString()});
           for (PrintWriter out : writer) {
             out.close();
           }
@@ -666,6 +906,46 @@
         }
       }
     }
+    /**
+     * Log job's priority. 
+     * @param jobid job id
+     * @param priority Jobs priority 
+     */
+    public static void logJobPriority(JobID jobid, JobPriority priority){
+      if (!disableHistory){
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.Job,
+                         new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY},
+                         new String[] {jobid.toString(), priority.toString()});
+        }
+      }
+    }
+
+    /**
+     * Log job's submit-time/launch-time 
+     * @param jobid job id
+     * @param submitTime job's submit time
+     * @param launchTime job's launch time
+     */
+    public static void logJobSubmitTime(JobID jobid, long submitTime, 
+                                        long launchTime){
+      if (!disableHistory){
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.Job,
+                         new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, 
+                                     Keys.LAUNCH_TIME},
+                         new String[] {jobid.toString(), 
+                                       String.valueOf(submitTime), 
+                                       String.valueOf(launchTime)});
+        }
+      }
+    }
   }
   /**
    * Helper class for logging or reading back events related to Task's start, finish or failure. 
@@ -716,7 +996,7 @@
                                     Keys.COUNTERS}, 
                          new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), 
                                        String.valueOf(finishTime),
-                                       counters.makeCompactString()});
+                                       counters.makeEscapedCompactString()});
         }
       }
     }
@@ -728,15 +1008,31 @@
      * @param error error message for failure. 
      */
     public static void logFailed(TaskID taskId, String taskType, long time, String error){
+      logFailed(taskId, taskType, time, error, null);
+    }
+    
+    /**
+     * @param failedDueToAttempt The attempt that caused the failure, if any
+     */
+    public static void logFailed(TaskID taskId, String taskType, long time,
+                                 String error, 
+                                 TaskAttemptID failedDueToAttempt){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                      + taskId.getJobID()); 
 
         if (null != writer){
+          String failedAttempt = failedDueToAttempt == null
+                                 ? ""
+                                 : failedDueToAttempt.toString();
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
-                                    Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR}, 
-                         new String[]{ taskId.toString(),  taskType, Values.FAILED.name(), String.valueOf(time) , error});
+                                    Keys.TASK_STATUS, Keys.FINISH_TIME, 
+                                    Keys.ERROR, Keys.TASK_ATTEMPT_ID}, 
+                         new String[]{ taskId.toString(),  taskType, 
+                                      Values.FAILED.name(), 
+                                      String.valueOf(time) , error, 
+                                      failedAttempt});
         }
       }
     }
@@ -764,11 +1060,12 @@
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param hostName host name of the task attempt. 
      * @deprecated Use 
-     *             {@link #logStarted(TaskAttemptID, long, String, boolean)}
+     *             {@link #logStarted(TaskAttemptID, long, String, int, 
+     *                                boolean)}
      */
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, false);
     }
     
     /**
@@ -776,11 +1073,13 @@
      *  
      * @param taskAttemptId task attempt id
      * @param startTime start time of task attempt as reported by task tracker. 
-     * @param hostName host name of the task attempt.
+     * @param trackerName name of the tracker executing the task attempt.
+     * @param httpPort http port of the task tracker executing the task attempt
      * @param isCleanup Whether the attempt is cleanup or not 
      */
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
-                                  String hostName, boolean isCleanup){
+                                  String trackerName, int httpPort, 
+                                  boolean isCleanup){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -789,12 +1088,13 @@
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
-                                     Keys.HOSTNAME},
+                                     Keys.TRACKER_NAME, Keys.HTTP_PORT},
                          new String[]{isCleanup ? Values.CLEANUP.name() : 
                                                   Values.MAP.name(),
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
-                                      String.valueOf(startTime), hostName}); 
+                                      String.valueOf(startTime), trackerName,
+                                      String.valueOf(httpPort)}); 
         }
       }
     }
@@ -805,12 +1105,14 @@
      * @param finishTime finish time
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, String, boolean)}
+     * {@link #logFinished(TaskAttemptID, long, String, boolean, String, 
+     *                     Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
-      logFinished(taskAttemptId, finishTime, hostName, false);
+      logFinished(taskAttemptId, finishTime, hostName, false, "", 
+                  new Counters());
     }
 
     /**
@@ -820,11 +1122,14 @@
      * @param finishTime finish time
      * @param hostName host name 
      * @param isCleanup Whether the attempt is cleanup or not 
+     * @param stateString state string of the task attempt
+     * @param counter counters of the task attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long finishTime, 
                                    String hostName,
-                                   boolean isCleanup) {
+                                   boolean isCleanup, String stateString, 
+                                   Counters counter) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -833,13 +1138,16 @@
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                     Keys.FINISH_TIME, Keys.HOSTNAME},
+                                     Keys.FINISH_TIME, Keys.HOSTNAME, 
+                                     Keys.STATE_STRING, Keys.COUNTERS},
                          new String[]{isCleanup ? Values.CLEANUP.name() : 
                                                   Values.MAP.name(), 
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(),  
-                                      String.valueOf(finishTime), hostName}); 
+                                      String.valueOf(finishTime), hostName, 
+                                      stateString, 
+                                      counter.makeEscapedCompactString()}); 
         }
       }
     }
@@ -951,12 +1259,12 @@
      * @param startTime start time
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logStarted(TaskAttemptID, long, String, boolean)}
+     * {@link #logStarted(TaskAttemptID, long, String, int, boolean)}
      */
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, false);
     }
     
     /**
@@ -964,12 +1272,13 @@
      * 
      * @param taskAttemptId task attempt id
      * @param startTime start time
-     * @param hostName host name 
+     * @param trackerName tracker name 
+     * @param httpPort the http port of the tracker executing the task attempt
      * @param isCleanup Whether the attempt is cleanup or not 
      */
     public static void logStarted(TaskAttemptID taskAttemptId, 
-                                  long startTime, String hostName, 
-                                  boolean isCleanup) {
+                                  long startTime, String trackerName, 
+                                  int httpPort, boolean isCleanup) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -978,12 +1287,13 @@
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
-                                      Keys.HOSTNAME},
+                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
                          new String[]{isCleanup ? Values.CLEANUP.name() : 
                                                   Values.REDUCE.name(),
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
-                                      String.valueOf(startTime), hostName}); 
+                                      String.valueOf(startTime), trackerName,
+                                      String.valueOf(httpPort)}); 
         }
       }
     }
@@ -996,14 +1306,15 @@
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean)}
+     * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean, 
+     *                     String, Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    String hostName){
       logFinished(taskAttemptId, shuffleFinished, sortFinished, 
-                  finishTime, hostName, false);
+                  finishTime, hostName, false, "", new Counters());
     }
     
     /**
@@ -1015,11 +1326,14 @@
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      * @param isCleanup Whether the attempt is cleanup or not 
+     * @param stateString the state string of the attempt
+     * @param counter counters of the attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long shuffleFinished, 
                                    long sortFinished, long finishTime, 
-                                   String hostName, boolean isCleanup) {
+                                   String hostName, boolean isCleanup,
+                                   String stateString, Counters counter) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1029,7 +1343,8 @@
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
-                                     Keys.FINISH_TIME, Keys.HOSTNAME},
+                                     Keys.FINISH_TIME, Keys.HOSTNAME, 
+                                     Keys.STATE_STRING, Keys.COUNTERS},
                          new String[]{isCleanup ? Values.CLEANUP.name() : 
                                                   Values.REDUCE.name(),
                                       taskAttemptId.getTaskID().toString(), 
@@ -1037,7 +1352,9 @@
                                       Values.SUCCESS.name(), 
                                       String.valueOf(shuffleFinished), 
                                       String.valueOf(sortFinished),
-                                      String.valueOf(finishTime), hostName}); 
+                                      String.valueOf(finishTime), hostName,
+                                      stateString, 
+                                      counter.makeEscapedCompactString()}); 
         }
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobID.java Tue Sep 16 05:05:18 2008
@@ -152,6 +152,23 @@
         + " is not properly formed");
   }
   
+  /** Check if the given string represents a job-id or not 
+   */
+  public static boolean isJobNameValid(String str) {
+    if(str == null) {
+      return false;
+    }
+    String[] parts = str.split("_");
+    if(parts.length == 3) {
+      if(parts[0].equals(JOB)) {
+          // other 2 parts should be parseable
+          return JobTracker.validateIdentifier(parts[1])
+                 && JobTracker.validateJobNumber(parts[2]);
+      }
+    }
+    return false;
+  }
+  
   /** 
    * Returns a regex pattern which matches task IDs. Arguments can 
    * be given null, in which case that part of the regex will be generic.  

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Sep 16 05:05:18 2008
@@ -56,6 +56,7 @@
     
   JobProfile profile;
   JobStatus status;
+  Path jobFile = null;
   Path localJobFile = null;
   Path localJarFile = null;
 
@@ -130,6 +131,7 @@
   private ResourceEstimator resourceEstimator; 
   
   long startTime;
+  long launchTime;
   long finishTime;
 
   private JobConf conf;
@@ -195,7 +197,7 @@
                                                       +"/"+ jobid + ".jar");
     Path sysDir = new Path(this.jobtracker.getSystemDir());
     FileSystem fs = sysDir.getFileSystem(default_conf);
-    Path jobFile = new Path(sysDir, jobid + "/job.xml");
+    jobFile = new Path(sysDir, jobid + "/job.xml");
     fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
@@ -216,9 +218,6 @@
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
-    JobHistory.JobInfo.logSubmitted(jobid, conf, jobFile.toString(), 
-                                    System.currentTimeMillis()); 
-        
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
     this.jobMetrics.setTag("user", conf.getUser());
@@ -332,6 +331,12 @@
       return;
     }
 
+    // log job info
+    JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
+                                    this.startTime);
+    // log the job priority
+    setPriority(this.priority);
+    
     //
     // read input splits and create a map per a split
     //
@@ -361,20 +366,23 @@
       nonRunningMapCache = createCache(splits, maxLevel);
     }
         
+    // set the launch time
+    this.launchTime = System.currentTimeMillis();
+
     // if no split is returned, job is considered completed and successful
     if (numMapTasks == 0) {
       // Finished time need to be setted here to prevent this job to be retired
       // from the job tracker jobs at the next retire iteration.
-      this.finishTime = System.currentTimeMillis();
+      this.finishTime = this.launchTime;
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setCleanupProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited.set(true);
       JobHistory.JobInfo.logStarted(profile.getJobID(), 
-                                    System.currentTimeMillis(), 0, 0);
+                                    this.launchTime, 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
-                                     System.currentTimeMillis(), 0, 0, 0, 0,
+                                     this.finishTime, 0, 0, 0, 0,
                                      getCounters());
       // Special case because the Job is not queued
       JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
@@ -409,7 +417,8 @@
     this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
     tasksInited.set(true);
         
-    JobHistory.JobInfo.logStarted(profile.getJobID(), System.currentTimeMillis(), numMapTasks, numReduceTasks);
+    JobHistory.JobInfo.logStarted(profile.getJobID(), this.launchTime, 
+                                  numMapTasks, numReduceTasks);
   }
 
   /////////////////////////////////////////////////////
@@ -421,6 +430,9 @@
   public JobStatus getStatus() {
     return status;
   }
+  public synchronized long getLaunchTime() {
+    return launchTime;
+  }
   public long getStartTime() {
     return startTime;
   }
@@ -462,6 +474,8 @@
     } else {
       this.priority = priority;
     }
+    // log and change to the job's priority
+    JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
 
   // Accessors for resources.
@@ -469,6 +483,14 @@
     return maxVirtualMemoryForTask;
   }
   
+  // Update the job start/launch time (upon restart) and log to history
+  synchronized void updateJobTime(long startTime, long launchTime) {
+    // log and change to the job's start/launch time
+    this.startTime = startTime;
+    this.launchTime = launchTime;
+    JobHistory.JobInfo.logJobSubmitTime(jobId, startTime, launchTime);
+  }
+
   long getInputLength() {
     return inputLength;
   }
@@ -776,16 +798,7 @@
     
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
-      runningMapTasks += 1;
-      if (maps[target].getActiveTasks().size() > 1)
-        speculativeMapTasks++;
-      if (maps[target].isFirstAttempt(result.getTaskID())) {
-        JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(),
-                                   System.currentTimeMillis(),
-                                   maps[target].getSplitNodes());
-      }
-
-      jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
+      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
     }
 
     return result;
@@ -888,21 +901,118 @@
     
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
-      runningReduceTasks += 1;
-      if (reduces[target].getActiveTasks().size() > 1)
-        speculativeReduceTasks++;
-      if (reduces[target].isFirstAttempt(result.getTaskID())) {
-        JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(),
-                                   System.currentTimeMillis(), "");
-      }
-
-      jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
+      addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
     }
 
     return result;
   }
+  
+  // returns the (cache)level at which the nodes matches
+  private int getMatchingLevelForNodes(Node n1, Node n2) {
+    int count = 0;
+    do {
+      if (n1.equals(n2)) {
+        return count;
+      }
+      ++count;
+      n1 = n1.getParent();
+      n2 = n2.getParent();
+    } while (n1 != null);
+    return this.maxLevel;
+  }
+
+  /**
+   * Populate the data structures as a task is scheduled.
+   * @param tip The tip for which the task is added
+   * @param id The attempt-id for the task
+   * @param tts task-tracker status
+   * @param isScheduled Whether this task is scheduled from the JT or has 
+   *        joined back upon restart
+   */
+  synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id, 
+                                        TaskTrackerStatus tts, 
+                                        boolean isScheduled) {
+    // keeping the earlier ordering intact
+    String name;
+    Enum counter;
+    if (tip.isMapTask()) {
+      ++runningMapTasks;
+      name = Values.MAP.name();
+      counter = Counter.TOTAL_LAUNCHED_MAPS;
+      if (tip.getActiveTasks().size() > 1)
+        speculativeMapTasks++;
+    } else {
+      ++runningReduceTasks;
+      name = Values.REDUCE.name();
+      counter = Counter.TOTAL_LAUNCHED_REDUCES;
+      if (tip.getActiveTasks().size() > 1)
+        speculativeReduceTasks++;
+    }
+    // Note that the logs are for the scheduled tasks only. Tasks that join on 
+    // restart has already their logs in place.
+    if (tip.isFirstAttempt(id)) {
+      JobHistory.Task.logStarted(tip.getTIPId(), name,
+                                 tip.getExecStartTime(), "");
+    }
+    jobCounters.incrCounter(counter, 1);
+    
+    // Make an entry in the tip if the attempt is not scheduled i.e externally
+    // added
+    if (!isScheduled) {
+      tip.addRunningTask(id, tts.getTrackerName());
+    }
+
+    //TODO The only problem with these counters would be on restart.
+    // The jobtracker updates the counter only when the task that is scheduled
+    // if from a non-running tip and is local (data, rack ...). But upon restart
+    // as the reports come from the task tracker, there is no good way to infer
+    // when exactly to increment the locality counters. The only solution is to 
+    // increment the counters for all the tasks irrespective of 
+    //    - whether the tip is running or not
+    //    - whether its a speculative task or not
+    //
+    // So to simplify, increment the data locality counter whenever there is 
+    // data locality.
+    if (tip.isMapTask()) {
+      // increment the data locality counter for maps
+      Node tracker = jobtracker.getNode(tts.getHost());
+      int level = this.maxLevel;
+      // find the right level across split locations
+      for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
+        Node datanode = jobtracker.getNode(local);
+        int newLevel = this.maxLevel;
+        if (tracker != null && datanode != null) {
+          newLevel = getMatchingLevelForNodes(tracker, datanode);
+        }
+        if (newLevel < level) {
+          level = newLevel;
+          // an optimization
+          if (level == 0) {
+            break;
+          }
+        }
+      }
+      switch (level) {
+      case 0 :
+        LOG.info("Choosing data-local task " + tip.getTIPId());
+        jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+        break;
+      case 1:
+        LOG.info("Choosing rack-local task " + tip.getTIPId());
+        jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+        break;
+      default :
+        // check if there is any locality
+        if (level != this.maxLevel) {
+          LOG.info("Choosing cached task at level " + level + tip.getTIPId());
+          jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+        }
+        break;
+      }
+    }
+  }
     
-  private String convertTrackerNameToHostName(String trackerName) {
+  static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
     // Convert the trackerName to it's host name
     int indexOfColon = trackerName.indexOf(":");
@@ -1294,18 +1404,6 @@
               nonRunningMapCache.remove(key);
             }
 
-            if (level == 0) {
-              LOG.info("Choosing data-local task " + tip.getTIPId());
-              jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
-            } else if (level == 1){
-              LOG.info("Choosing rack-local task " + tip.getTIPId());
-              jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
-            } else {
-              LOG.info("Choosing cached task at level " + level 
-                       + tip.getTIPId());
-              jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
-            }
-
             return tip.getIdWithinJob();
           }
         }
@@ -1377,16 +1475,6 @@
               if (cacheForLevel.size() == 0) {
                 runningMapCache.remove(key);
               }
-              if (level == 0) {
-                LOG.info("Choosing a data-local task " + tip.getTIPId() 
-                         + " for speculation");
-              } else if (level == 1){
-                LOG.info("Choosing a rack-local task " + tip.getTIPId() 
-                         + " for speculation");
-              } else {
-                LOG.info("Choosing a cached task at level " + level
-                         + tip.getTIPId() + " for speculation");
-              }
               return tip.getIdWithinJob();
             }
           }
@@ -1536,29 +1624,36 @@
     resourceEstimator.updateWithCompletedTask(status, tip);
 
     // Update jobhistory 
-    String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
-                               status.getTaskTracker()).getHost()).toString();
+    TaskTrackerStatus ttStatus = 
+      this.jobtracker.getTaskTracker(status.getTaskTracker());
+    String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                                       taskTrackerName, tip.isCleanupTask()); 
+                                       status.getTaskTracker(), 
+                                       ttStatus.getHttpPort(), 
+                                       tip.isCleanupTask()); 
       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
-                                        taskTrackerName, tip.isCleanupTask()); 
+                                        trackerHostname, tip.isCleanupTask(),
+                                        status.getStateString(), 
+                                        status.getCounters()); 
       JobHistory.Task.logFinished(tip.getTIPId(), 
                                   tip.isCleanupTask() ? Values.CLEANUP.name() :
                                   Values.MAP.name(), 
-                                  status.getFinishTime(),
+                                  tip.getExecFinishTime(),
                                   status.getCounters()); 
     }else{
       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
-                                          taskTrackerName, 
+                                          status.getTaskTracker(),
+                                          ttStatus.getHttpPort(), 
                                           tip.isCleanupTask()); 
       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
                                            status.getSortFinishTime(), status.getFinishTime(), 
-                                           taskTrackerName,
-                                           tip.isCleanupTask()); 
+                                           trackerHostname, tip.isCleanupTask(), 
+                                           status.getStateString(), 
+                                           status.getCounters()); 
       JobHistory.Task.logFinished(tip.getTIPId(), 
                                   tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.REDUCE.name(), status.getFinishTime(),
+                                  Values.REDUCE.name(), tip.getExecFinishTime(),
                                   status.getCounters()); 
     }
         
@@ -1631,13 +1726,16 @@
       this.status.setRunState(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       this.finishTime = System.currentTimeMillis();
-      garbageCollect();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks, failedMapTasks, 
                                      failedReduceTasks, getCounters());
+      // Note that finalize will close the job history handles which garbage collect
+      // might try to finalize
+      garbageCollect();
+      
       metrics.completeJob();
     }
   }
@@ -1741,28 +1839,30 @@
     }
         
     // update job history
-    String taskTrackerName = jobtracker.getNode(
-                               taskTrackerStatus.getHost()).toString();
+    String taskTrackerName = taskTrackerStatus.getHost();
+    long finishTime = status.getFinishTime();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                taskTrackerName, tip.isCleanupTask());
+          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
+          tip.isCleanupTask());
       if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.MapAttempt.logFailed(status.getTaskID(), System.currentTimeMillis(),
+        JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), tip.isCleanupTask());
       } else {
-        JobHistory.MapAttempt.logKilled(status.getTaskID(), System.currentTimeMillis(),
+        JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(),
                 tip.isCleanupTask());
       }
     } else {
       JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-                taskTrackerName, tip.isCleanupTask());
+          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
+          tip.isCleanupTask());
       if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.ReduceAttempt.logFailed(status.getTaskID(), System.currentTimeMillis(),
+        JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
                 tip.isCleanupTask());
       } else {
-        JobHistory.ReduceAttempt.logKilled(status.getTaskID(), System.currentTimeMillis(),
+        JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
                 tip.isCleanupTask());
       }
@@ -1813,7 +1913,7 @@
                                   tip.isMapTask() ? 
                                           Values.MAP.name() : 
                                           Values.REDUCE.name(),  
-                                  System.currentTimeMillis(), 
+                                          status.getFinishTime(), 
                                   status.getDiagnosticInfo());
         if (tip.isCleanupTask()) {
           // kill the other tip
@@ -1859,11 +1959,12 @@
                                                     reason,
                                                     trackerName, phase,
                                                     null);
+    status.setFinishTime(System.currentTimeMillis());
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
                               tip.isCleanupTask() ? Values.CLEANUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
-                              System.currentTimeMillis(), reason); 
+                              tip.getExecFinishTime(), reason, taskid); 
   }
        
                            
@@ -1953,6 +2054,10 @@
     }
     return null;
   }
+  
+  synchronized int getNumTaskCompletionEvents() {
+    return taskCompletionEvents.size();
+  }
     
   synchronized public TaskCompletionEvent[] getTaskCompletionEvents(
                                                                     int fromEventId, int maxEvents) {



Mime
View raw message