hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r809881 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/
Date Tue, 01 Sep 2009 08:26:56 GMT
Author: sharad
Date: Tue Sep  1 08:26:55 2009
New Revision: 809881

URL: http://svn.apache.org/viewvc?rev=809881&view=rev
Log:
MAPREDUCE-873. Simplify job recovery. InComplete jobs are resubmitted on jobtracker restart. Contributed by Sharad Agarwal.

Removed:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
    hadoop/mapreduce/trunk/src/webapps/job/jobhistory.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep  1 08:26:55 2009
@@ -280,6 +280,9 @@
     MAPREDUCE-875. Make DBRecordReader execute queries lazily. (Aaron Kimball 
     via enis)
 
+    MAPREDUCE-873. Simplify job recovery. InComplete jobs are resubmitted on 
+    jobtracker restart. (sharad)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java Tue Sep  1 08:26:55 2009
@@ -21,10 +21,6 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -41,7 +37,6 @@
   short responseId;
   int heartbeatInterval;
   TaskTrackerAction[] actions;
-  Set<JobID> recoveredJobs = new HashSet<JobID>();
 
   HeartbeatResponse() {}
   
@@ -58,15 +53,7 @@
   public short getResponseId() {
     return responseId;
   }
-  
-  public void setRecoveredJobs(Set<JobID> ids) {
-    recoveredJobs = ids; 
-  }
-  
-  public Set<JobID> getRecoveredJobs() {
-    return recoveredJobs;
-  }
-  
+
   public void setActions(TaskTrackerAction[] actions) {
     this.actions = actions;
   }
@@ -103,11 +90,6 @@
         action.write(out);
       }
     }
-    // Write the job ids of the jobs that were recovered
-    out.writeInt(recoveredJobs.size());
-    for (JobID id : recoveredJobs) {
-      id.write(out);
-    }
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -125,12 +107,5 @@
     } else {
       actions = null;
     }
-    // Read the job ids of the jobs that were recovered
-    int size = in.readInt();
-    for (int i = 0; i < size; ++i) {
-      JobID id = new JobID();
-      id.readFields(in);
-      recoveredJobs.add(id);
-    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Tue Sep  1 08:26:55 2009
@@ -95,11 +95,11 @@
   
   static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
   
+  static final String OLD_SUFFIX = ".old";
   public static final int JOB_NAME_TRIM_LENGTH = 50;
   private static String JOBTRACKER_UNIQUE_STRING = null;
   private static String LOG_DIR = null;
   private static boolean disableHistory = true; 
-  private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;
   private static String jobtrackerHostname;
   private static JobHistoryFilesManager fileManager = null;
@@ -187,6 +187,20 @@
       fileCache.remove(id);
     }
 
+    void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
+      if (disableHistory) {
+        return;
+      }
+      //check if path exists, in case of retries it may not exist
+      if (LOGDIR_FS.exists(fromPath)) {
+        LOG.info("Moving " + fromPath.toString() + " to " + 
+            toPath.toString()); 
+        DONEDIR_FS.moveFromLocalFile(fromPath, toPath);
+        DONEDIR_FS.setPermission(toPath, 
+            new FsPermission(HISTORY_FILE_PERMISSION));
+      }
+    }
+
     void moveToDone(final JobID id) {
       if (disableHistory) {
         return;
@@ -212,14 +226,7 @@
           //move the files to DONE folder
           try {
             for (Path path : paths) {
-              //check if path exists, in case of retries it may not exist
-              if (LOGDIR_FS.exists(path)) {
-                LOG.info("Moving " + path.toString() + " to " + 
-                    DONE.toString()); 
-                DONEDIR_FS.moveFromLocalFile(path, DONE);
-                DONEDIR_FS.setPermission(new Path(DONE, path.getName()), 
-                    new FsPermission(HISTORY_FILE_PERMISSION));
-              }
+              moveToDoneNow(path, new Path(DONE, path.getName()));
             }
           } catch (Throwable e) {
             LOG.error("Unable to move history file to DONE folder.", e);
@@ -356,6 +363,21 @@
       }
 
       fileManager.start();
+      //move the log files remaining from last run to the DONE folder
+      //suffix the file name based on Jobtracker identifier so that history
+      //files with same job id don't get over written in case of recovery.
+      FileStatus[] files = LOGDIR_FS.listStatus(new Path(LOG_DIR));
+      String jtIdentifier = fileManager.jobTracker.getTrackerIdentifier();
+      String fileSuffix = "." + jtIdentifier + OLD_SUFFIX;
+      for (FileStatus fileStatus : files) {
+        Path fromPath = fileStatus.getPath();
+        if (fromPath.equals(DONE)) { //DONE can be a subfolder of log dir
+          continue;
+        }
+        LOG.info("Moving log file from last run: " + fromPath);
+        Path toPath = new Path(DONE, fromPath.getName() + fileSuffix);
+        fileManager.moveToDoneNow(fromPath, toPath);
+      }
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -826,22 +848,6 @@
       return "\\Q"+string.replaceAll("\\\\E", "\\\\E\\\\\\\\E\\\\Q")+"\\E";
     }
 
-    /**
-     * Recover the job history filename from the history folder. 
-     * Uses the following pattern
-     *    $jt-hostname_[0-9]*_$job-id_$user-$job-name*
-     * @param jobConf the job conf
-     * @param id job id
-     */
-    public static synchronized String getJobHistoryFileName(JobConf jobConf, 
-                                                            JobID id) 
-    throws IOException {
-      if (LOG_DIR == null) {
-        return null;
-      }
-      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
-    }
-
     static synchronized String getDoneJobHistoryFileName(JobConf jobConf, 
         JobID id) throws IOException {
       if (DONE == null) {
@@ -888,8 +894,7 @@
       if (statuses.length == 0) {
         LOG.info("Nothing to recover for job " + id);
       } else {
-        // return filename considering that fact the name can be a 
-        // secondary filename like filename.recover
+        // return filename
         filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName);
         LOG.info("Recovered job history filename for job " + id + " is " 
                  + filename);
@@ -902,150 +907,8 @@
     private static String getPrimaryFilename(String filename, String jobName) 
     throws IOException{
       filename = decodeJobHistoryFileName(filename);
-      // Remove the '.recover' suffix if it exists
-      if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
-        int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
-        filename = filename.substring(0, newLength);
-      }
       return encodeJobHistoryFileName(filename);
     }
-    
-    /** Since there was a restart, there should be a master file and 
-     * a recovery file. Once the recovery is complete, the master should be 
-     * deleted as an indication that the recovery file should be treated as the 
-     * master upon completion or next restart.
-     * @param fileName the history filename that needs checkpointing
-     * @param conf Job conf
-     * @throws IOException
-     */
-    static synchronized void checkpointRecovery(String fileName, JobConf conf) 
-    throws IOException {
-      Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
-      if (logPath != null) {
-        LOG.info("Deleting job history file " + logPath.getName());
-        LOGDIR_FS.delete(logPath, false);
-      }
-      // do the same for the user file too
-      logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 
-                                                                   conf);
-      if (logPath != null) {
-        FileSystem fs = logPath.getFileSystem(conf);
-        fs.delete(logPath, false);
-      }
-    }
-    
-    static String getSecondaryJobHistoryFile(String filename) 
-    throws IOException {
-      return encodeJobHistoryFileName(
-          decodeJobHistoryFileName(filename) + SECONDARY_FILE_SUFFIX);
-    }
-    
-    /** Selects one of the two files generated as a part of recovery. 
-     * The thumb rule is that always select the oldest file. 
-     * This call makes sure that only one file is left in the end. 
-     * @param conf job conf
-     * @param logFilePath Path of the log file
-     * @throws IOException 
-     */
-    public synchronized static Path recoverJobHistoryFile(JobConf conf, 
-                                                          Path logFilePath) 
-    throws IOException {
-      Path ret;
-      String logFileName = logFilePath.getName();
-      String tmpFilename = getSecondaryJobHistoryFile(logFileName);
-      Path logDir = logFilePath.getParent();
-      Path tmpFilePath = new Path(logDir, tmpFilename);
-      if (LOGDIR_FS.exists(logFilePath)) {
-        LOG.info(logFileName + " exists!");
-        if (LOGDIR_FS.exists(tmpFilePath)) {
-          LOG.info("Deleting " + tmpFilename 
-                   + "  and using " + logFileName + " for recovery.");
-          LOGDIR_FS.delete(tmpFilePath, false);
-        }
-        ret = tmpFilePath;
-      } else {
-        LOG.info(logFileName + " doesnt exist! Using " 
-                 + tmpFilename + " for recovery.");
-        if (LOGDIR_FS.exists(tmpFilePath)) {
-          LOG.info("Renaming " + tmpFilename + " to " + logFileName);
-          LOGDIR_FS.rename(tmpFilePath, logFilePath);
-          ret = tmpFilePath;
-        } else {
-          ret = logFilePath;
-        }
-      }
-
-      // do the same for the user files too
-      logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
-      if (logFilePath != null) {
-        FileSystem fs = logFilePath.getFileSystem(conf);
-        logDir = logFilePath.getParent();
-        tmpFilePath = new Path(logDir, tmpFilename);
-        if (fs.exists(logFilePath)) {
-          LOG.info(logFileName + " exists!");
-          if (fs.exists(tmpFilePath)) {
-            LOG.info("Deleting " + tmpFilename + "  and making " + logFileName 
-                     + " as the master history file for user.");
-            fs.delete(tmpFilePath, false);
-          }
-        } else {
-          LOG.info(logFileName + " doesnt exist! Using " 
-                   + tmpFilename + " as the master history file for user.");
-          if (fs.exists(tmpFilePath)) {
-            LOG.info("Renaming " + tmpFilename + " to " + logFileName 
-                     + " in user directory");
-            fs.rename(tmpFilePath, logFilePath);
-          }
-        }
-      }
-      
-      return ret;
-    }
-
-    /** Finalize the recovery and make one file in the end. 
-     * This invloves renaming the recover file to the master file.
-     * Note that this api should be invoked only if recovery is involved.
-     * @param id Job id  
-     * @param conf the job conf
-     * @throws IOException
-     */
-    static synchronized void finalizeRecovery(JobID id, JobConf conf) 
-    throws IOException {
-      Path tmpLogPath = fileManager.getHistoryFile(id);
-      if (tmpLogPath == null) {
-        LOG.debug("No file for job with " + id + " found in cache!");
-        return;
-      }
-      String tmpLogFileName = tmpLogPath.getName();
-      
-      // get the primary filename from the cached filename
-      String masterLogFileName = 
-        getPrimaryFilename(tmpLogFileName, getJobName(conf));
-      Path masterLogPath = new Path(tmpLogPath.getParent(), masterLogFileName);
-      
-      // rename the tmp file to the master file. Note that this should be 
-      // done only when the file is closed and handles are released.
-      LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
-      LOGDIR_FS.rename(tmpLogPath, masterLogPath);
-      // update the cache
-      fileManager.setHistoryFile(id, masterLogPath);
-      
-      // do the same for the user file too
-      masterLogPath = 
-        JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
-                                                           conf);
-      tmpLogPath = 
-        JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, 
-                                                           conf);
-      if (masterLogPath != null) {
-        FileSystem fs = masterLogPath.getFileSystem(conf);
-        if (fs.exists(tmpLogPath)) {
-          LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
-                   + " in user directory");
-          fs.rename(tmpLogPath, masterLogPath);
-        }
-      }
-    }
 
     /**
      * Deletes job data from the local disk.
@@ -1079,20 +942,10 @@
      * @param jobConfPath path to job conf xml file in HDFS.
      * @param submitTime time when job tracker received the job
      * @throws IOException
-     * @deprecated Use 
-     *     {@link #logSubmitted(JobID, JobConf, String, long, boolean)} instead.
      */
-     @Deprecated
      public static void logSubmitted(JobID jobId, JobConf jobConf, 
                                     String jobConfPath, long submitTime) 
     throws IOException {
-      logSubmitted(jobId, jobConf, jobConfPath, submitTime, true);
-    }
-    
-    public static void logSubmitted(JobID jobId, JobConf jobConf, 
-                                    String jobConfPath, long submitTime, 
-                                    boolean restarted) 
-    throws IOException {
       FileSystem fs = null;
       String userLogDir = null;
       String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
@@ -1106,23 +959,9 @@
         
         // get the history filename
         String logFileName = null;
-        if (restarted) {
-          logFileName = getJobHistoryFileName(jobConf, jobId);
-          if (logFileName == null) {
-            logFileName =
-              encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
-          } else {
-            String parts[] = logFileName.split("_");
-            //TODO this is a hack :(
-            // jobtracker-hostname_jobtracker-identifier_
-            String jtUniqueString = parts[0] + "_" + parts[1] + "_";
-            jobUniqueString = jtUniqueString + jobId.toString();
-          }
-        } else {
-          logFileName = 
-            encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
-        }
-
+        logFileName = 
+          encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+        
         // setup the history log file for this job
         Path logFile = getJobHistoryLogLocation(logFileName);
         
@@ -1135,11 +974,6 @@
           PrintWriter writer = null;
 
           if (LOG_DIR != null) {           
-          // create output stream for logging in hadoop.job.history.location 
-            if (restarted) {
-              logFile = recoverJobHistoryFile(jobConf, logFile);
-              logFileName = logFile.getName();
-            }
             
             int defaultBufferSize = 
               LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
@@ -1156,8 +990,6 @@
             fileManager.setHistoryFile(jobId, logFile);
           }
           if (userLogFile != null) {
-            // Get the actual filename as recoverJobHistoryFile() might return
-            // a different filename
             userLogDir = userLogFile.getParent().toString();
             userLogFile = new Path(userLogDir, logFileName);
             
@@ -1429,15 +1261,7 @@
      * @param jobid job id
      * @param submitTime job's submit time
      * @param launchTime job's launch time
-     * @param restartCount number of times the job got restarted
-     * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead.
      */
-    @Deprecated
-    public static void logJobInfo(JobID jobid, long submitTime, long launchTime,
-                                  int restartCount){
-      logJobInfo(jobid, submitTime, launchTime);
-    }
-
     public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
     {
       if (!disableHistory){

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Sep  1 08:26:55 2009
@@ -321,11 +321,6 @@
    * to the tracker.
    */
   public JobInProgress(JobID jobid, JobTracker jobtracker, 
-                       JobConf default_conf) throws IOException {
-    this(jobid, jobtracker, default_conf, 0);
-  }
-  
-  public JobInProgress(JobID jobid, JobTracker jobtracker, 
                        JobConf default_conf, int rCount) throws IOException {
     this.restartCount = rCount;
     this.jobId = jobid;
@@ -379,10 +374,8 @@
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid.toString());
-    if (!hasRestarted()) { //This is temporary until we fix the restart model
-      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
-    }
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
     this.anyCacheLevel = this.maxLevel+1;
     this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -491,10 +484,6 @@
   public boolean inited() {
     return tasksInited.get();
   }
-  
-  boolean hasRestarted() {
-    return restartCount > 0;
-  }
 
   /**
    * Get the number of slots required to run a single map task-attempt.
@@ -626,7 +615,7 @@
   void logToJobHistory() throws IOException {
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
-        this.startTime, hasRestarted());
+        this.startTime);
   }
 
   JobClient.RawSplit[] createSplits() throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Sep  1 08:26:55 2009
@@ -64,17 +64,14 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
-import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
-import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -96,9 +93,6 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
@@ -928,172 +922,11 @@
   // Used to recover the jobs upon restart
   ///////////////////////////////////////////////////////
   class RecoveryManager {
-    Set<JobID> jobsToRecover; // set of jobs to be recovered
-    
-    private int totalEventsRecovered = 0;
+    private Set<JobID> jobsToRecover; // set of jobs to be recovered
+    private int recovered;
     private int restartCount = 0;
     private boolean shouldRecover = false;
 
-    Set<String> recoveredTrackers = 
-      Collections.synchronizedSet(new HashSet<String>());
-    
-    /** A custom listener that replays the events in the order in which the 
-     * events (task attempts) occurred. 
-     */
-    class JobRecoveryListener implements Listener {
-      // The owner job
-      private JobInProgress jip;
-      
-      private JobHistory.JobInfo job; // current job's info object
-      
-      // Maintain the count of the (attempt) events recovered
-      private int numEventsRecovered = 0;
-      
-      // Maintains open transactions
-      private Map<String, String> hangingAttempts = 
-        new HashMap<String, String>();
-      
-      // Whether there are any updates for this job
-      private boolean hasUpdates = false;
-      
-      public JobRecoveryListener(JobInProgress jip) {
-        this.jip = jip;
-        this.job = new JobHistory.JobInfo(jip.getJobID().toString());
-      }
-
-      /**
-       * Process a task. Note that a task might commit a previously pending 
-       * transaction.
-       */
-      private void processTask(String taskId, JobHistory.Task task) {
-        // Any TASK info commits the previous transaction
-        boolean hasHanging = hangingAttempts.remove(taskId) != null;
-        if (hasHanging) {
-          numEventsRecovered += 2;
-        }
-        
-        TaskID id = TaskID.forName(taskId);
-        TaskInProgress tip = getTip(id);
-
-        updateTip(tip, task);
-      }
-
-      /**
-       * Adds a task-attempt in the listener
-       */
-      private void processTaskAttempt(String taskAttemptId, 
-                                      JobHistory.TaskAttempt attempt) {
-        TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
-        
-        // Check if the transaction for this attempt can be committed
-        String taskStatus = attempt.get(Keys.TASK_STATUS);
-
-        if (taskStatus.length() > 0) {
-          // This means this is an update event
-          if (taskStatus.equals(Values.SUCCESS.name())) {
-            // Mark this attempt as hanging
-            hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
-            addSuccessfulAttempt(jip, id, attempt);
-          } else {
-            addUnsuccessfulAttempt(jip, id, attempt);
-            numEventsRecovered += 2;
-          }
-        } else {
-          createTaskAttempt(jip, id, attempt);
-        }
-      }
-
-      public void handle(JobHistory.RecordTypes recType, Map<Keys, 
-                         String> values) throws IOException {
-        if (recType == JobHistory.RecordTypes.Job) {
-          // Update the meta-level job information
-          job.handle(values);
-          
-          // Forcefully init the job as we have some updates for it
-          checkAndInit();
-        } else if (recType.equals(JobHistory.RecordTypes.Task)) {
-          String taskId = values.get(Keys.TASKID);
-          
-          // Create a task
-          JobHistory.Task task = new JobHistory.Task();
-          task.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(task)) {
-            return;
-          }
-            
-          // Process the task i.e update the tip state
-          processTask(taskId, task);
-        } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the attempt state via job
-          processTaskAttempt(attemptId, attempt);
-        } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the job state via job
-          processTaskAttempt(attemptId, attempt);
-        }
-      }
-
-      // Check if the task is of type CLEANUP
-      private boolean isCleanup(JobHistory.Task task) {
-        String taskType = task.get(Keys.TASK_TYPE);
-        return Values.CLEANUP.name().equals(taskType);
-      }
-      
-      // Init the job if its ready for init. Also make sure that the scheduler
-      // is updated
-      private void checkAndInit() throws IOException {
-        String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.PREP.name().equals(jobStatus)) {
-          hasUpdates = true;
-          LOG.info("Calling init from RM for job " + jip.getJobID().toString());
-          initJob(jip);
-          if (!jip.inited()) {
-            throw new IOException("Failed to initialize job " + jip.getJobID());
-          }
-        }
-      }
-      
-      void close() {
-        if (hasUpdates) {
-          // Apply the final (job-level) updates
-          JobStatusChangeEvent event = updateJob(jip, job);
-          
-          synchronized (JobTracker.this) {
-            // Update the job listeners
-            updateJobInProgressListeners(event);
-          }
-        }
-      }
-      
-      public int getNumEventsRecovered() {
-        return numEventsRecovered;
-      }
-
-    }
-    
     public RecoveryManager() {
       jobsToRecover = new TreeSet<JobID>();
     }
@@ -1102,6 +935,10 @@
       return jobsToRecover.contains(id);
     }
 
+    int getRecovered() {
+      return recovered;
+    }
+
     void addJobForRecovery(JobID id) {
       jobsToRecover.add(id);
     }
@@ -1110,18 +947,6 @@
       return shouldRecover;
     }
 
-    public boolean shouldSchedule() {
-      return recoveredTrackers.isEmpty();
-    }
-
-    private void markTracker(String trackerName) {
-      recoveredTrackers.add(trackerName);
-    }
-
-    void unMarkTracker(String trackerName) {
-      recoveredTrackers.remove(trackerName);
-    }
-
     Set<JobID> getJobsToRecover() {
       return jobsToRecover;
     }
@@ -1157,229 +982,8 @@
         }
       }
     }
-    
-    private JobStatusChangeEvent updateJob(JobInProgress jip, 
-                                           JobHistory.JobInfo job) {
-      // Change the job priority
-      String jobpriority = job.get(Keys.JOB_PRIORITY);
-      JobPriority priority = JobPriority.valueOf(jobpriority);
-      // It's important to update this via the jobtracker's api as it will 
-      // take care of updating the event listeners too
-      setJobPriority(jip.getJobID(), priority);
-
-      // Save the previous job status
-      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-      
-      // Set the start/launch time only if there are recovered tasks
-      // Increment the job's restart count
-      jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
-                        job.getLong(JobHistory.Keys.LAUNCH_TIME));
 
-      // Save the new job status
-      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-      
-      return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus, 
-                                      newStatus);
-    }
-    
-    private void updateTip(TaskInProgress tip, JobHistory.Task task) {
-      long startTime = task.getLong(Keys.START_TIME);
-      if (startTime != 0) {
-        tip.setExecStartTime(startTime);
-      }
-      
-      long finishTime = task.getLong(Keys.FINISH_TIME);
-      // For failed tasks finish-time will be missing
-      if (finishTime != 0) {
-        tip.setExecFinishTime(finishTime);
-      }
-      
-      String cause = task.get(Keys.TASK_ATTEMPT_ID);
-      if (cause.length() > 0) {
-        // This means that the this is a FAILED events
-        TaskAttemptID id = TaskAttemptID.forName(cause);
-        TaskStatus status = tip.getTaskStatus(id);
-        synchronized (JobTracker.this) {
-          // This will add the tip failed event in the new log
-          tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
-                                  status.getPhase(), status.getRunState(), 
-                                  status.getTaskTracker());
-        }
-      }
-    }
-    
-    private void createTaskAttempt(JobInProgress job, 
-                                   TaskAttemptID attemptId, 
-                                   JobHistory.TaskAttempt attempt) {
-      TaskID id = attemptId.getTaskID();
-      String type = attempt.get(Keys.TASK_TYPE);
-      TaskInProgress tip = job.getTaskInProgress(id);
-      
-      //    I. Get the required info
-      TaskStatus taskStatus = null;
-      String trackerName = attempt.get(Keys.TRACKER_NAME);
-      String trackerHostName = 
-        JobInProgress.convertTrackerNameToHostName(trackerName);
-      // recover the port information.
-      int port = 0; // default to 0
-      String hport = attempt.get(Keys.HTTP_PORT);
-      if (hport != null && hport.length() > 0) {
-        port = attempt.getInt(Keys.HTTP_PORT);
-      }
-      
-      long attemptStartTime = attempt.getLong(Keys.START_TIME);
-
-      // II. Create the (appropriate) task status
-      if (type.equals(Values.MAP.name())) {
-        taskStatus = 
-          new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
-                            TaskStatus.State.RUNNING, "", "", trackerName, 
-                            TaskStatus.Phase.MAP, new Counters());
-      } else {
-        taskStatus = 
-          new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE), 
-                               TaskStatus.State.RUNNING, "", "", trackerName, 
-                               TaskStatus.Phase.REDUCE, new Counters());
-      }
-
-      // Set the start time
-      taskStatus.setStartTime(attemptStartTime);
-
-      List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
-      ttStatusList.add(taskStatus);
-      
-      // III. Create the dummy tasktracker status
-      TaskTrackerStatus ttStatus = 
-        new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
-                              0 , 0, 0);
-      ttStatus.setLastSeen(clock.getTime());
-
-      synchronized (JobTracker.this) {
-        synchronized (taskTrackers) {
-          synchronized (trackerExpiryQueue) {
-            // IV. Register a new tracker
-            TaskTracker taskTracker = getTaskTracker(trackerName);
-            boolean isTrackerRegistered =  (taskTracker != null);
-            if (!isTrackerRegistered) {
-              markTracker(trackerName); // add the tracker to recovery-manager
-              taskTracker = new TaskTracker(trackerName);
-              taskTracker.setStatus(ttStatus);
-              addNewTracker(taskTracker);
-            }
-      
-            // V. Update the tracker status
-            // This will update the meta info of the jobtracker and also add the
-            // tracker status if missing i.e register it
-            updateTaskTrackerStatus(trackerName, ttStatus);
-          }
-        }
-        // Register the attempt with job and tip, under JobTracker lock. 
-        // Since, as of today they are atomic through heartbeat.
-        // VI. Register the attempt
-        //   a) In the job
-        job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
-        //   b) In the tip
-        tip.updateStatus(taskStatus);
-      }
-      
-      // VII. Make an entry in the launched tasks
-      expireLaunchingTasks.addNewTask(attemptId);
-    }
-    
-    private void addSuccessfulAttempt(JobInProgress job, 
-                                      TaskAttemptID attemptId, 
-                                      JobHistory.TaskAttempt attempt) {
-      // I. Get the required info
-      TaskID taskId = attemptId.getTaskID();
-      String type = attempt.get(Keys.TASK_TYPE);
-
-      TaskInProgress tip = job.getTaskInProgress(taskId);
-      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
-      // Get the task status and the tracker name and make a copy of it
-      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
-      taskStatus.setFinishTime(attemptFinishTime);
-
-      String stateString = attempt.get(Keys.STATE_STRING);
-
-      // Update the basic values
-      taskStatus.setStateString(stateString);
-      taskStatus.setProgress(1.0f);
-      taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-
-      // Set the shuffle/sort finished times
-      if (type.equals(Values.REDUCE.name())) {
-        long shuffleTime = 
-          Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
-        long sortTime = 
-          Long.parseLong(attempt.get(Keys.SORT_FINISHED));
-        taskStatus.setShuffleFinishTime(shuffleTime);
-        taskStatus.setSortFinishTime(sortTime);
-      }
-      else if (type.equals(Values.MAP.name())) {
-        taskStatus.setMapFinishTime(
-            Long.parseLong(attempt.get(Keys.MAP_FINISHED)));
-      }
-
-      // Add the counters
-      String counterString = attempt.get(Keys.COUNTERS);
-      Counters counter = null;
-      //TODO Check if an exception should be thrown
-      try {
-        counter = Counters.fromEscapedCompactString(counterString);
-      } catch (ParseException pe) { 
-        counter = new Counters(); // Set it to empty counter
-      }
-      taskStatus.setCounters(counter);
-      
-      synchronized (JobTracker.this) {
-        // II. Replay the status
-        job.updateTaskStatus(tip, taskStatus);
-      }
-      
-      // III. Prevent the task from expiry
-      expireLaunchingTasks.removeTask(attemptId);
-    }
-    
-    private void addUnsuccessfulAttempt(JobInProgress job,
-                                        TaskAttemptID attemptId,
-                                        JobHistory.TaskAttempt attempt) {
-      // I. Get the required info
-      TaskID taskId = attemptId.getTaskID();
-      TaskInProgress tip = job.getTaskInProgress(taskId);
-      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
-      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
-      taskStatus.setFinishTime(attemptFinishTime);
-
-      // Reset the progress
-      taskStatus.setProgress(0.0f);
-      
-      String stateString = attempt.get(Keys.STATE_STRING);
-      taskStatus.setStateString(stateString);
-
-      boolean hasFailed = 
-        attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
-      // Set the state failed/killed
-      if (hasFailed) {
-        taskStatus.setRunState(TaskStatus.State.FAILED);
-      } else {
-        taskStatus.setRunState(TaskStatus.State.KILLED);
-      }
-
-      // Get/Set the error msg
-      String diagInfo = attempt.get(Keys.ERROR);
-      taskStatus.setDiagnosticInfo(diagInfo); // diag info
-
-      synchronized (JobTracker.this) {
-        // II. Update the task status
-        job.updateTaskStatus(tip, taskStatus);
-      }
-
-     // III. Prevent the task from expiry
-     expireLaunchingTasks.removeTask(attemptId);
-    }
-  
+   
     Path getRestartCountFile() {
       return new Path(getSystemDir(), "jobtracker.info");
     }
@@ -1478,166 +1082,25 @@
         return;
       }
 
-      LOG.info("Starting the recovery process with restart count : " 
-               + restartCount);
-
-      // I. Init the jobs and cache the recovered job history filenames
-      Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
-      Iterator<JobID> idIter = jobsToRecover.iterator();
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        LOG.info("Trying to recover details of job " + id);
+      LOG.info("Starting the recovery process for " + jobsToRecover.size() +
+          " jobs ...");
+      for (JobID jobId : jobsToRecover) {
+        LOG.info("Submitting job "+ jobId);
         try {
-          // 1. Create the job object
-          JobInProgress job = 
-            new JobInProgress(id, JobTracker.this, conf, restartCount);
-
-          // 2. Check if the user has appropriate access
-          // Get the user group info for the job's owner
-          UserGroupInformation ugi =
-            UserGroupInformation.readFrom(job.getJobConf());
-          LOG.info("Submitting job " + id + " on behalf of user "
-                   + ugi.getUserName() + " in groups : "
-                   + StringUtils.arrayToString(ugi.getGroupNames()));
-
-          // check the access
-          try {
-            checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
-          } catch (Throwable t) {
-            LOG.warn("Access denied for user " + ugi.getUserName() 
-                     + " in groups : [" 
-                     + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
-            throw t;
-          }
-
-          // 3. Get the log file and the file path
-          String logFileName = 
-            JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          if (logFileName != null) {
-            Path jobHistoryFilePath = 
-              JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
-            // 4. Recover the history file. This involved
-            //     - deleting file.recover if file exists
-            //     - renaming file.recover to file if file doesnt exist
-            // This makes sure that the (master) file exists
-            JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                     jobHistoryFilePath);
-          
-            // 5. Cache the history file name as it costs one dfs access
-            jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-          } else {
-            LOG.info("No history file found for job " + id);
-            idIter.remove(); // remove from recovery list
-          }
-
-          // 6. Sumbit the job to the jobtracker
-          addJob(id, job);
-        } catch (Throwable t) {
-          LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
-          idIter.remove();
-          continue;
-        }
-      }
-      long now = clock.getTime();
-      LOG.info("Took a total of " 
-               + StringUtils.formatTime(now 
-                                        - recoveryProcessStartTime) 
-               + " for recovering filenames of all the jobs from history.");
-
-
-      // II. Recover each job
-      idIter = jobsToRecover.iterator();
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        JobInProgress pJob = getJob(id);
-
-        // 1. Get the required info
-        // Get the recovered history file
-        Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
-        String logFileName = jobHistoryFilePath.getName();
-
-        FileSystem fs;
-        try {
-          fs = jobHistoryFilePath.getFileSystem(conf);
-        } catch (IOException ioe) {
-          LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
-                   ioe);
-          continue;
-        }
-
-        // 2. Parse the history file
-        // Note that this also involves job update
-        JobRecoveryListener listener = new JobRecoveryListener(pJob);
-        try {
-          JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
-                                        listener, fs);
-        } catch (Throwable t) {
-          LOG.info("Error reading history file of job " + pJob.getJobID() 
-                   + ". Ignoring the error and continuing.", t);
-        }
-
-        // 3. Close the listener
-        listener.close();
-        
-        // 4. Update the recovery metric
-        totalEventsRecovered += listener.getNumEventsRecovered();
-
-        // 5. Cleanup history
-        // Delete the master log file as an indication that the new file
-        // should be used in future
-        try {
-          synchronized (pJob) {
-            JobHistory.JobInfo.checkpointRecovery(logFileName, 
-                                                  pJob.getJobConf());
-          }
-        } catch (Throwable t) {
-          LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Continuing.", t);
-        }
-
-        if (pJob.isComplete()) {
-          idIter.remove(); // no need to keep this job info as its successful
+          submitJob(jobId, restartCount);
+          recovered++;
+        } catch (Exception e) {
+          LOG.warn("Could not recover job " + jobId, e);
         }
       }
-
-      long recoveryProcessEndTime = clock.getTime();
-      LOG.info("Took a total of " 
-               + StringUtils.formatTime(recoveryProcessEndTime
-                                        - now) 
-               + " for parsing and recovering all the jobs from history.");
-
-      recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime;
-      LOG.info("Took a total of " + StringUtils.formatTime(recoveryDuration) 
-               + " for the whole recovery process.");
+      recoveryDuration = clock.getTime() - recoveryProcessStartTime;
       hasRecovered = true;
 
-      // III. Finalize the recovery
-      synchronized (trackerExpiryQueue) {
-        // Make sure that the tracker statuses in the expiry-tracker queue
-        // are updated
-        int size = trackerExpiryQueue.size();
-        for (int i = 0; i < size ; ++i) {
-          // Get the first tasktracker
-          TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
-
-          // Remove it
-          trackerExpiryQueue.remove(taskTracker);
-
-          // Set the new time
-          taskTracker.setLastSeen(recoveryProcessEndTime);
-
-          // Add back to get the sorted list
-          trackerExpiryQueue.add(taskTracker);
-        }
-      }
-
-      LOG.info("Restoration done. Recovery complete!");
-    }
-    
-    int totalEventsRecovered() {
-      return totalEventsRecovered;
+      LOG.info("Recovery done! Recoverd " + recovered +" of "+ 
+          jobsToRecover.size() + " jobs.");
+      LOG.info("Recovery Duration (ms):" + recoveryDuration);
     }
+
   }
 
   private final JobTrackerInstrumentation myInstrumentation;
@@ -1655,7 +1118,6 @@
   private HostsFileReader hostsReader;
   
   // JobTracker recovery variables
-  private volatile boolean hasRestarted = false;
   private volatile boolean hasRecovered = false;
   private volatile long recoveryDuration;
 
@@ -1943,7 +1405,6 @@
         // Check if the history is enabled .. as we can't have persistence with 
         // history disabled
         if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
-            && !JobHistory.isDisableHistory()
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
             try {
@@ -1955,8 +1416,7 @@
           }
           
           // Check if there are jobs to be recovered
-          hasRestarted = recoveryManager.shouldRecover();
-          if (hasRestarted) {
+          if (recoveryManager.shouldRecover()) {
             break; // if there is something to recover else clean the sys dir
           }
         }
@@ -2034,13 +1494,6 @@
   }
 
   /**
-   * Whether the JT has restarted
-   */
-  public boolean hasRestarted() {
-    return hasRestarted;
-  }
-
-  /**
    * Whether the JT has recovered upon restart
    */
   public boolean hasRecovered() {
@@ -2051,9 +1504,7 @@
    * How long the jobtracker took to recover from restart.
    */
   public long getRecoveryDuration() {
-    return hasRestarted() 
-           ? recoveryDuration
-           : 0;
+    return recoveryDuration;
   }
 
   /**
@@ -2128,12 +1579,8 @@
 
     taskScheduler.start();
     
-    //  Start the recovery after starting the scheduler
-    try {
-      recoveryManager.recover();
-    } catch (Throwable t) {
-      LOG.warn("Recovery manager crashed! Ignoring.", t);
-    }
+    recoveryManager.recover();
+    
     // refresh the node list as the recovery manager might have added 
     // disallowed trackers
     refreshHosts();
@@ -2380,13 +1827,6 @@
 
     // start the merge of log files
     JobID id = job.getStatus().getJobID();
-    if (job.hasRestarted()) {
-      try {
-        JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
-      } catch (IOException ioe) {
-        LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
-      }
-    }
 
     // mark the job as completed
     try {
@@ -2776,7 +2216,6 @@
     
     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
-    boolean addRestartInfo = false;
 
     if (initialContact != true) {
       // If this isn't the 'initial contact' from the tasktracker,
@@ -2786,20 +2225,15 @@
       if (prevHeartbeatResponse == null) {
         // This is the first heartbeat from the old tracker to the newly 
         // started JobTracker
-        if (hasRestarted()) {
-          addRestartInfo = true;
-          // inform the recovery manager about this tracker joining back
-          recoveryManager.unMarkTracker(trackerName);
-        } else {
-          // Jobtracker might have restarted but no recovery is needed
-          // otherwise this code should not be reached
-          LOG.warn("Serious problem, cannot find record of 'previous' " +
-                   "heartbeat for '" + trackerName + 
-                   "'; reinitializing the tasktracker");
-          return new HeartbeatResponse(responseId, 
-              new TaskTrackerAction[] {new ReinitTrackerAction()});
-        }
-
+        
+        // Jobtracker might have restarted but no recovery is needed
+        // otherwise this code should not be reached
+        LOG.warn("Serious problem, cannot find record of 'previous' " +
+                 "heartbeat for '" + trackerName + 
+                 "'; reinitializing the tasktracker");
+        return new HeartbeatResponse(responseId, 
+            new TaskTrackerAction[] {new ReinitTrackerAction()});
+      
       } else {
                 
         // It is completely safe to not process a 'duplicate' heartbeat from a 
@@ -2831,7 +2265,7 @@
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
     isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
     // Check for new tasks to be executed on the tasktracker
-    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
+    if (acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -2874,11 +2308,6 @@
     response.setActions(
                         actions.toArray(new TaskTrackerAction[actions.size()]));
     
-    // check if the restart info is req
-    if (addRestartInfo) {
-      response.setRecoveredJobs(recoveryManager.getJobsToRecover());
-    }
-        
     // Update the trackerToHeartbeatResponseMap
     trackerToHeartbeatResponseMap.put(trackerName, response);
 
@@ -3308,12 +2737,20 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+    return submitJob(jobId, 0);
+  }
+
+  /**
+   * Submits either a new job or a job from an earlier run.
+   */
+  private synchronized JobStatus submitJob(JobID jobId, 
+      int restartCount) throws IOException {
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
     
-    JobInProgress job = new JobInProgress(jobId, this, this.conf);
+    JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount);
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
@@ -3326,9 +2763,11 @@
       new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));      
       throw new IOException("Queue \"" + queue + "\" is not running");
     }
-    // check for access
     try {
-      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB);
+      // check for access
+      UserGroupInformation ugi =
+        UserGroupInformation.readFrom(job.getJobConf());
+      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
     } catch (IOException ioe) {
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
@@ -3918,14 +3357,8 @@
       }
 
       TaskInProgress tip = taskidToTIPMap.get(taskId);
-      // Check if the tip is known to the jobtracker. In case of a restarted
-      // jt, some tasks might join in later
-      if (tip != null || hasRestarted()) {
-        if (tip == null) {
-          tip = job.getTaskInProgress(taskId.getTaskID());
-          job.addRunningTaskToTIP(tip, taskId, status, false);
-        }
-        
+      
+      if (tip != null) {
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
         // Clone TaskStatus object here, because JobInProgress
@@ -3986,9 +3419,6 @@
       trackerToTasksToCleanup.remove(trackerName);
     }
     
-    // Inform the recovery manager
-    recoveryManager.unMarkTracker(trackerName);
-    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Sep  1 08:26:55 2009
@@ -1366,39 +1366,6 @@
         // next heartbeat   
         lastHeartbeat = System.currentTimeMillis();
         
-        
-        // Check if the map-event list needs purging
-        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
-        if (jobs.size() > 0) {
-          synchronized (this) {
-            // purge the local map events list
-            for (JobID job : jobs) {
-              RunningJob rjob;
-              synchronized (runningJobs) {
-                rjob = runningJobs.get(job);          
-                if (rjob != null) {
-                  synchronized (rjob) {
-                    FetchStatus f = rjob.getFetchStatus();
-                    if (f != null) {
-                      f.reset();
-                    }
-                  }
-                }
-              }
-            }
-
-            // Mark the reducers in shuffle for rollback
-            synchronized (shouldReset) {
-              for (Map.Entry<TaskAttemptID, TaskInProgress> entry 
-                   : runningTasks.entrySet()) {
-                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
-                  this.shouldReset.add(entry.getKey());
-                }
-              }
-            }
-          }
-        }
-        
         TaskTrackerAction[] actions = heartbeatResponse.getActions();
         if(LOG.isDebugEnabled()) {
           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Tue Sep  1 08:26:55 2009
@@ -564,13 +564,7 @@
   public JobConf getJobTrackerConf() {
     return this.conf;
   }
-  
-  /**
-   * Get num events recovered
-   */
-  public int getNumEventsRecovered() {
-    return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered();
-  }
+
 
   public int getFaultCount(String hostName) {
     return jobTracker.getJobTracker().getFaultCount(hostName);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Tue Sep  1 08:26:55 2009
@@ -811,11 +811,37 @@
       //set the done folder location
       String doneFolder = "history_done";
       conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+      
+      String logDir = 
+        "file:///" + new File(System.getProperty("hadoop.log.dir")).
+        getAbsolutePath() + File.separator + "history";
+
+      Path logDirPath = new Path(logDir);
+      FileSystem logDirFs = logDirPath.getFileSystem(conf);
+      //there may be some stale files, clean them
+      if (logDirFs.exists(logDirPath)) {
+        boolean deleted = logDirFs.delete(logDirPath, true);
+        LOG.info(logDirPath + " deleted " + deleted); 
+      }
+      
+      logDirFs.mkdirs(logDirPath);
+      assertEquals("No of file in logDir not correct", 0, 
+          logDirFs.listStatus(logDirPath).length); 
+      logDirFs.create(new Path(logDirPath, "f1"));
+      logDirFs.create(new Path(logDirPath, "f2"));
+      assertEquals("No of file in logDir not correct", 2, 
+          logDirFs.listStatus(logDirPath).length);
 
       MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
+      assertEquals("Files in logDir did not move to DONE folder",
+          0, logDirFs.listStatus(logDirPath).length);
+      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      assertEquals("Files in DONE dir not correct",
+          2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+
       // run the TCs
       conf = mr.createJobConf();
 
@@ -836,7 +862,6 @@
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
-      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.getName());
       JobID id = job.getID();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue Sep  1 08:26:55 2009
@@ -435,8 +435,8 @@
     UtilsForTests.signalTasks(dfs, dfs.getFileSystem(), 
         signalFilename.toString(), signalFilename.toString(), 1);
 
-    assertTrue("Decommissioning of tracker has no effect restarted job", 
-        jt.getJob(job.getID()).failedMapTasks > 0);
+    assertTrue("Decommissioning of tracker has effect on restarted job", 
+        jt.getJob(job.getID()).failedMapTasks == 0);
     
     // check the cluster status and tracker size
     assertEquals("Tracker is not lost upon host decommissioning", 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Sep  1 08:26:55 2009
@@ -30,8 +30,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
-import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
-import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -104,7 +102,8 @@
     String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
     mr.stopJobTracker();
     
-    // delete the job.xml of job #1 causing the job to fail in constructor
+    // delete the job.xml of job #1 causing the job to fail in submit Job
+    //while recovery itself
     Path jobFile = 
       new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
     LOG.info("Deleting job.xml file : " + jobFile.toString());
@@ -138,7 +137,12 @@
     // check if the jobtracker came up or not
     assertEquals("JobTracker crashed!", 
                  JobTracker.State.RUNNING, status.getJobTrackerState());
-    
+
+    // assert the no of recovered jobs
+    assertEquals("No of recovered jobs not correct",
+                 1, mr.getJobTrackerRunner().getJobTracker().
+                 recoveryManager.getRecovered());
+
     mr.shutdown();
   }
   
@@ -220,6 +224,8 @@
     LOG.info("Submitted job " + rJob3.getID() + " with different user");
     
     jip = jobtracker.getJob(rJob3.getID());
+    assertEquals("Restart count is not correct",
+        0, jip.getNumRestarts());
 
     while (!jip.inited()) {
       LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
@@ -251,6 +257,11 @@
     assertEquals("Recovery manager failed to tolerate job failures",
                  2, jobtracker.getAllJobs().length);
     
+    // assert the no of recovered jobs
+    assertEquals("No of recovered jobs not correct",
+                 2, jobtracker.recoveryManager.getRecovered());
+    assertEquals("Restart count is not correct",
+        1, jobtracker.getJob(rJob2.getID()).getNumRestarts());
     // check if the job#1 has failed
     JobStatus status = jobtracker.getJobStatus(rJob1.getID());
     assertEquals("Faulty job not failed", 
@@ -264,141 +275,6 @@
     
     mr.shutdown();
   }
-  
-  /**
-   * Test if restart count of the jobtracker is correctly managed.
-   * Steps are as follows :
-   *   - start the jobtracker and check if the info file gets created.
-   *   - stops the jobtracker, deletes the jobtracker.info file and checks if
-   *     upon restart the recovery is 'off'
-   *   - submit a job to the jobtracker.
-   *   - restart the jobtracker k times and check if the restart count on ith 
-   *     iteration is i.
-   *   - submit a new job and check if its restart count is 0.
-   *   - garble the jobtracker.info file and restart he jobtracker, the 
-   *     jobtracker should crash.
-   */
-  public void testRestartCount() throws Exception {
-    LOG.info("Testing restart-count");
-    String signalFile = new Path(TEST_DIR, "signal").toString();
-    
-    // clean up
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true);
-    
-    JobConf conf = new JobConf();
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    conf.setBoolean("mapred.jobtracker.restart.recover", true);
-    // since there is no need for initing
-    conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
-                  TaskScheduler.class);
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-    JobClient jc = new JobClient(mr.createJobConf());
-
-    // check if the jobtracker info file exists
-    Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
-    assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
-
-    // check if garbling the system files disables the recovery process
-    LOG.info("Stopping jobtracker for testing with system files deleted");
-    mr.stopJobTracker();
-    
-    // delete the info file
-    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
-    fs.delete(rFile,false);
-    
-    // start the jobtracker
-    LOG.info("Starting jobtracker with system files deleted");
-    mr.startJobTracker();
-    
-    UtilsForTests.waitForJobTracker(jc);
-    jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
-    // check if the recovey is disabled
-    assertFalse("Recovery is not disabled upon missing system files", 
-                jobtracker.recoveryManager.shouldRecover());
-
-    // check if the system dir is sane
-    assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
-    Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
-    assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-
-    // submit a job
-    JobConf job = mr.createJobConf();
-    
-    UtilsForTests.configureWaitingJobConf(job, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, 
-        "test-recovery-manager", signalFile, signalFile);
-    
-    // submit the faulty job
-    RunningJob rJob = jc.submitJob(job);
-    LOG.info("Submitted first job " + rJob.getID());
-
-    // kill the jobtracker multiple times and check if the count is correct
-    for (int i = 1; i <= 5; ++i) {
-      LOG.info("Stopping jobtracker for " + i + " time");
-      mr.stopJobTracker();
-      
-      // start the jobtracker
-      LOG.info("Starting jobtracker for " + i + " time");
-      mr.startJobTracker();
-      
-      UtilsForTests.waitForJobTracker(jc);
-      
-      // check if the system dir is sane
-      assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
-      assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-      
-      jobtracker = mr.getJobTrackerRunner().getJobTracker();
-      JobInProgress jip = jobtracker.getJob(rJob.getID());
-      
-      // assert if restart count is correct
-      assertEquals("Recovery manager failed to recover restart count",
-                   i, jip.getNumRestarts());
-    }
-    
-    // kill the old job
-    rJob.killJob();
-
-    // II. Submit a new job and check if the restart count is 0
-    JobConf job1 = mr.createJobConf();
-    
-    UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
-        "test-recovery-manager", signalFile, signalFile);
-    
-    // make sure that the job id's dont clash
-    jobtracker.getNewJobId();
-
-    // submit a new job
-    rJob = jc.submitJob(job1);
-    LOG.info("Submitted first job after restart" + rJob.getID());
-
-    // assert if restart count is correct
-    JobInProgress jip = jobtracker.getJob(rJob.getID());
-    assertEquals("Restart count for new job is incorrect",
-                 0, jip.getNumRestarts());
-
-    LOG.info("Stopping jobtracker for testing the fs errors");
-    mr.stopJobTracker();
-
-    // check if system.dir problems in recovery kills the jobtracker
-    fs.delete(rFile, false);
-    FSDataOutputStream out = fs.create(rFile);
-    out.writeBoolean(true);
-    out.close();
-
-    // start the jobtracker
-    LOG.info("Starting jobtracker with fs errors");
-    mr.startJobTracker();
-    JobTrackerRunner runner = mr.getJobTrackerRunner();
-    assertFalse("JobTracker is still alive", runner.isActive());
-
-    mr.shutdown();
-  } 
 
   /**
    * Test if the jobtracker waits for the info file to be created before 

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobhistory.jsp?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobhistory.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobhistory.jsp Tue Sep  1 08:26:55 2009
@@ -95,7 +95,9 @@
       }
 
       public boolean accept(Path path) {
-        return !(path.getName().endsWith(".xml")) && matchUser(path.getName()) && matchJobName(path.getName());
+        return !((path.getName().endsWith(".xml") || 
+          path.getName().endsWith(JobHistory.OLD_SUFFIX)) && 
+          matchUser(path.getName()) && matchJobName(path.getName()));
       }
     };
     

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=809881&r1=809880&r2=809881&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Tue Sep  1 08:26:55 2009
@@ -70,7 +70,7 @@
               "</td></tr></table>\n");
 
     out.print("<br>");
-    if (tracker.hasRestarted()) {
+    if (tracker.recoveryManager.shouldRecover()) {
       out.print("<span class=\"small\"><i>");
       if (tracker.hasRecovered()) {
         out.print("The JobTracker got restarted and recovered back in " );



Mime
View raw message