hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r695823 [2/3] - in /hadoop/core/trunk: ./ conf/ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/
Date Tue, 16 Sep 2008 12:05:19 GMT
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 16 05:05:18 2008
@@ -22,6 +22,7 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,6 +46,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AccessControlException;
@@ -53,6 +55,9 @@
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.mapred.JobHistory.Keys;
+import org.apache.hadoop.mapred.JobHistory.Listener;
+import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -389,6 +394,501 @@
   }
 
  
+  ///////////////////////////////////////////////////////
+  // Used to recover the jobs upon restart
+  ///////////////////////////////////////////////////////
+  class RecoveryManager {
+    Set<JobID> jobsToRecover; // set of jobs to be recovered
+    
+    private int totalEventsRecovered = 0;
+    
+    /** A custom listener that replays the events in the order in which the 
+     * events (task attempts) occurred. 
+     */
+    class JobRecoveryListener implements Listener {
+      // The owner job
+      private JobInProgress jip;
+      
+      private JobHistory.JobInfo job; // current job's info object
+      
+      // Maintain the count of the (attempt) events recovered
+      private int numEventsRecovered = 0;
+      
+      // Maintains open transactions
+      private Map<String, String> hangingAttempts = 
+        new HashMap<String, String>();
+      
+      // Whether there are any updates for this job
+      private boolean hasUpdates = false;
+      
+      public JobRecoveryListener(JobInProgress jip) {
+        this.jip = jip;
+        this.job = new JobHistory.JobInfo(jip.getJobID().toString());
+      }
+
+      /**
+       * Process a task. Note that a task might commit a previously pending 
+       * transaction.
+       */
+      private void processTask(String taskId, JobHistory.Task task) {
+        // Any TASK info commits the previous transaction
+        boolean hasHanging = hangingAttempts.remove(taskId) != null;
+        if (hasHanging) {
+          numEventsRecovered += 2;
+        }
+        
+        TaskID id = TaskID.forName(taskId);
+        TaskInProgress tip = getTip(id);
+        
+        updateTip(tip, task);
+      }
+
+      /**
+       * Adds a task-attempt in the listener
+       */
+      private void processTaskAttempt(String taskAttemptId, 
+                                      JobHistory.TaskAttempt attempt) {
+        TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
+        
+        // Check if the transaction for this attempt can be committed
+        String taskStatus = attempt.get(Keys.TASK_STATUS);
+        
+        if (taskStatus.length() > 0) {
+          // This means this is an update event
+          if (taskStatus.equals(Values.SUCCESS.name())) {
+            // Mark this attempt as hanging
+            hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
+            addSuccessfulAttempt(jip, id, attempt);
+          } else {
+            addUnsuccessfulAttempt(jip, id, attempt);
+            numEventsRecovered += 2;
+          }
+        } else {
+          createTaskAttempt(jip, id, attempt);
+        }
+      }
+
+      public void handle(JobHistory.RecordTypes recType, Map<Keys, 
+                         String> values) throws IOException {
+        if (recType == JobHistory.RecordTypes.Job) {
+          // Update the meta-level job information
+          job.handle(values);
+          
+          // Forcefully init the job as we have some updates for it
+          checkAndInit();
+        } else if (recType.equals(JobHistory.RecordTypes.Task)) {
+          String taskId = values.get(Keys.TASKID);
+          
+          // Create a task
+          JobHistory.Task task = new JobHistory.Task();
+          task.handle(values);
+          
+          // Ignore if its a cleanup task
+          if (isCleanup(task)) {
+            return;
+          }
+            
+          // Process the task i.e update the tip state
+          processTask(taskId, task);
+        } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
+          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
+          
+          // Create a task attempt
+          JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
+          attempt.handle(values);
+          
+          // Ignore if its a cleanup task
+          if (isCleanup(attempt)) {
+            return;
+          }
+          
+          // Process the attempt i.e update the attempt state via job
+          processTaskAttempt(attemptId, attempt);
+        } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
+          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
+          
+          // Create a task attempt
+          JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
+          attempt.handle(values);
+          
+          // Ignore if its a cleanup task
+          if (isCleanup(attempt)) {
+            return;
+          }
+          
+          // Process the attempt i.e update the job state via job
+          processTaskAttempt(attemptId, attempt);
+        }
+      }
+
+      // Check if the task is of type CLEANUP
+      private boolean isCleanup(JobHistory.Task task) {
+        String taskType = task.get(Keys.TASK_TYPE);
+        return Values.CLEANUP.name().equals(taskType);
+      }
+      
+      // Init the job if its ready for init. Also make sure that the scheduler
+      // is updated
+      private void checkAndInit() throws IOException {
+        String jobStatus = this.job.get(Keys.JOB_STATUS);
+        if (Values.RUNNING.name().equals(jobStatus)) {
+          hasUpdates = true;
+          LOG.info("Calling init from RM for job " + jip.getJobID().toString());
+          jip.initTasks();
+          updateJobListeners();
+        }
+      }
+      
+      private void updateJobListeners() {
+        // The scheduler needs to be informed as the recovery-manager
+        // has inited the jobs
+        for (JobInProgressListener listener : jobInProgressListeners) {
+          listener.jobUpdated(jip);
+        }
+      }
+      
+      void close() {
+        if (hasUpdates) {
+          // Apply the final (job-level) updates
+          updateJob(jip, job);
+          // Update the job listeners as the start/submit time and the job 
+          // priority has changed
+          updateJobListeners();
+        }
+      }
+      
+      public int getNumEventsRecovered() {
+        return numEventsRecovered;
+      }
+
+    }
+    
+    public RecoveryManager() {
+      jobsToRecover = new TreeSet<JobID>();
+    }
+
+    public boolean contains(JobID id) {
+      return jobsToRecover.contains(id);
+    }
+
+    void addJobForRecovery(JobID id) {
+      jobsToRecover.add(id);
+    }
+
+    public boolean shouldRecover() {
+      return jobsToRecover.size() != 0;
+    }
+
+    // checks if the job dir has the required files
+    public void checkAndAddJob(FileStatus status) throws IOException {
+      String jobName = status.getPath().getName();
+      if (JobID.isJobNameValid(jobName)) {
+        if (JobClient.isJobDirValid(status.getPath(), fs)) {
+          recoveryManager.addJobForRecovery(JobID.forName(jobName));
+        } else {
+          LOG.info("Found an incomplete job directory " + jobName + "." 
+                   + " Deleting it!!");
+          fs.delete(status.getPath(), true);
+        }
+      } else {
+        LOG.info("Deleting " + status.getPath());
+        fs.delete(status.getPath(), true);
+      }
+    }
+    
+    private void updateJob(JobInProgress jip, JobHistory.JobInfo job) {
+      // Set the start/launch time only if there are recovered tasks
+      jip.updateJobTime(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
+                        job.getLong(JobHistory.Keys.LAUNCH_TIME));
+      
+      // Change the job priority
+      String jobpriority = job.get(Keys.JOB_PRIORITY);
+      if (jobpriority.length() > 0) {
+        JobPriority priority = JobPriority.valueOf(jobpriority);
+        // Its important to update this via the jobtracker's api
+        setJobPriority(jip.getJobID(), priority);
+      }
+    }
+    
+    private void updateTip(TaskInProgress tip, JobHistory.Task task) {
+      long startTime = task.getLong(Keys.START_TIME);
+      if (startTime != 0) {
+        tip.setExecStartTime(startTime);
+      }
+      
+      long finishTime = task.getLong(Keys.FINISH_TIME);
+      // For failed tasks finish-time will be missing
+      if (finishTime != 0) {
+        tip.setExecFinishTime(finishTime);
+      }
+      
+      String cause = task.get(Keys.TASK_ATTEMPT_ID);
+      if (cause.length() > 0) {
+        // This means that the this is a FAILED events
+        TaskAttemptID id = TaskAttemptID.forName(cause);
+        TaskStatus status = tip.getTaskStatus(id);
+        // This will add the tip failed event in the new log
+        tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
+                                status.getPhase(), status.getRunState(), 
+                                status.getTaskTracker(), myInstrumentation);
+      }
+    }
+    
+    private void createTaskAttempt(JobInProgress job, 
+                                   TaskAttemptID attemptId, 
+                                   JobHistory.TaskAttempt attempt) {
+      TaskID id = attemptId.getTaskID();
+      String type = attempt.get(Keys.TASK_TYPE);
+      TaskInProgress tip = job.getTaskInProgress(id);
+      
+      //    I. Get the required info
+      TaskStatus taskStatus = null;
+      String trackerName = attempt.get(Keys.TRACKER_NAME);
+      String trackerHostName = 
+        JobInProgress.convertTrackerNameToHostName(trackerName);
+      int index = trackerHostName.indexOf("_");
+      trackerHostName = 
+        trackerHostName.substring(index + 1, trackerHostName.length());
+      int port = attempt.getInt(Keys.HTTP_PORT);
+      
+      long attemptStartTime = attempt.getLong(Keys.START_TIME);
+
+      // II. Create the (appropriate) task status
+      if (type.equals(Values.MAP.name())) {
+        taskStatus = 
+          new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
+                            "", "", trackerName, TaskStatus.Phase.MAP, 
+                            new Counters());
+      } else {
+        taskStatus = 
+          new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
+                               "", "", trackerName, TaskStatus.Phase.REDUCE, 
+                               new Counters());
+      }
+
+      // Set the start time
+      taskStatus.setStartTime(attemptStartTime);
+
+      List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
+      ttStatusList.add(taskStatus);
+      
+      // III. Create the dummy tasktracker status
+      TaskTrackerStatus ttStatus = 
+        new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
+                              0 , 0, 0);
+      ttStatus.setLastSeen(System.currentTimeMillis());
+
+      // IV. Register a new tracker
+      boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
+      if (!isTrackerRegistered) {
+        addNewTracker(ttStatus);
+      }
+      
+      // V. Update the tracker status
+      //    This will update the meta info of the jobtracker and also add the
+      //    tracker status if missing i.e register it
+      updateTaskTrackerStatus(trackerName, ttStatus);
+      
+      // VI. Register the attempt
+      //   a) In the job
+      job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
+      //   b) In the tip
+      tip.updateStatus(taskStatus);
+      
+      // VII. Make an entry in the launched tasks
+      expireLaunchingTasks.addNewTask(attemptId);
+    }
+    
+    private void addSuccessfulAttempt(JobInProgress job, 
+                                      TaskAttemptID attemptId, 
+                                      JobHistory.TaskAttempt attempt) {
+      // I. Get the required info
+      TaskID taskId = attemptId.getTaskID();
+      String type = attempt.get(Keys.TASK_TYPE);
+
+      TaskInProgress tip = job.getTaskInProgress(taskId);
+      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
+
+      // Get the task status and the tracker name and make a copy of it
+      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
+      taskStatus.setFinishTime(attemptFinishTime);
+
+      String stateString = attempt.get(Keys.STATE_STRING);
+
+      // Update the basic values
+      taskStatus.setStateString(stateString);
+      taskStatus.setProgress(1.0f);
+      taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+
+      // Set the shuffle/sort finished times
+      if (type.equals(Values.REDUCE.name())) {
+        long shuffleTime = 
+          Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
+        long sortTime = 
+          Long.parseLong(attempt.get(Keys.SORT_FINISHED));
+        taskStatus.setShuffleFinishTime(shuffleTime);
+        taskStatus.setSortFinishTime(sortTime);
+      }
+
+      // Add the counters
+      String counterString = attempt.get(Keys.COUNTERS);
+      Counters counter = null;
+      //TODO Check if an exception should be thrown
+      try {
+        counter = Counters.fromEscapedCompactString(counterString);
+      } catch (ParseException pe) { 
+        counter = new Counters(); // Set it to empty counter
+      }
+      taskStatus.setCounters(counter);
+      
+      // II. Replay the status
+      job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+      
+      // III. Prevent the task from expiry
+      expireLaunchingTasks.removeTask(attemptId);
+    }
+    
+    private void addUnsuccessfulAttempt(JobInProgress job,
+                                        TaskAttemptID attemptId,
+                                        JobHistory.TaskAttempt attempt) {
+      // I. Get the required info
+      TaskID taskId = attemptId.getTaskID();
+      TaskInProgress tip = job.getTaskInProgress(taskId);
+      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
+
+      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
+      taskStatus.setFinishTime(attemptFinishTime);
+
+      // Reset the progress
+      taskStatus.setProgress(0.0f);
+      
+      String stateString = attempt.get(Keys.STATE_STRING);
+      taskStatus.setStateString(stateString);
+
+      boolean hasFailed = 
+        attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
+      // Set the state failed/killed
+      if (hasFailed) {
+        taskStatus.setRunState(TaskStatus.State.FAILED);
+      } else {
+        taskStatus.setRunState(TaskStatus.State.KILLED);
+      }
+
+      // Get/Set the error msg
+      String diagInfo = attempt.get(Keys.ERROR);
+      taskStatus.setDiagnosticInfo(diagInfo); // diag info
+
+      // II. Update the task status
+     job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+
+     // III. Prevent the task from expiry
+     expireLaunchingTasks.removeTask(attemptId);
+    }
+  
+    public void recover() throws IOException {
+      // I. Init the jobs and cache the recovered job history filenames
+      Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
+      for (JobID id : jobsToRecover) {
+        // 1. Create the job object
+        JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+        
+        // 2. Get the log file and the file path
+        String logFileName = 
+          JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
+        Path jobHistoryFilePath = 
+          JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+        
+        // 3. Recover the history file. This involved
+        //     - deleting file.recover if file exists
+        //     - renaming file.recover to file if file doesnt exist
+        // This makes sure that the (master) file exists
+        JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
+                                                 jobHistoryFilePath);
+
+        // 4. Cache the history file name as it costs one dfs access
+        jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+
+        // 5. Sumbit the job to the jobtracker
+        addJob(id, job);
+      }
+
+      long recoveryStartTime = System.currentTimeMillis();
+
+      // II. Recover each job
+      for (JobID id : jobsToRecover) {
+        JobInProgress pJob = getJob(id);
+
+        // 1. Get the required info
+        // Get the recovered history file
+        Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
+        String logFileName = jobHistoryFilePath.getName();
+
+        FileSystem fs = jobHistoryFilePath.getFileSystem(conf);
+
+        // 2. Parse the history file
+        // Note that this also involves job update
+        JobRecoveryListener listener = new JobRecoveryListener(pJob);
+        try {
+          JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
+                                        listener, fs);
+        } catch (IOException e) {
+          LOG.info("JobTracker failed to recover job " + pJob + "."
+                   + " Ignoring it.", e);
+          continue;
+        }
+
+        // 3. Close the listener
+        listener.close();
+
+        // 4. Update the recovery metric
+        totalEventsRecovered += listener.getNumEventsRecovered();
+
+        // 5. Cleanup history
+        // Delete the master log file as an indication that the new file
+        // should be used in future
+        synchronized (pJob) {
+          JobHistory.JobInfo.checkpointRecovery(logFileName, 
+              pJob.getJobConf());
+        }
+
+        // 6. Inform the jobtracker as to how much of the data is recovered.
+        // This is done so that TT should rollback to account for lost
+        // updates
+        lastSeenEventMapOnRestart.put(pJob.getStatus().getJobID(), 
+                                      pJob.getNumTaskCompletionEvents());
+      }
+
+      recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
+      hasRecovered = true;
+
+      // III. Finalize the recovery
+      // Make sure that the tracker statuses in the expiry-tracker queue
+      // are updated
+      long now = System.currentTimeMillis();
+      int size = trackerExpiryQueue.size();
+      for (int i = 0; i < size ; ++i) {
+        // Get the first status
+        TaskTrackerStatus status = trackerExpiryQueue.first();
+
+        // Remove it
+        trackerExpiryQueue.remove(status);
+
+        // Set the new time
+        status.setLastSeen(now);
+
+        // Add back to get the sorted list
+        trackerExpiryQueue.add(status);
+      }
+
+      // IV. Cleanup
+      jobsToRecover.clear();
+      LOG.info("Restoration complete");
+    }
+    
+    int totalEventsRecovered() {
+      return totalEventsRecovered;
+    }
+  }
 
   private JobTrackerInstrumentation myInstrumentation = null;
     
@@ -403,6 +903,11 @@
   private int totalMapTaskCapacity;
   private int totalReduceTaskCapacity;
   private HostsFileReader hostsReader;
+  
+  // JobTracker recovery variables
+  private volatile boolean hasRestarted = false;
+  private volatile boolean hasRecovered = false;
+  private volatile long recoveryDuration;
 
   //
   // Properties to maintain while running Jobs and Tasks:
@@ -450,6 +955,10 @@
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
+  // A map from JobID to the last known task-completion-event-index on restart
+  Map<JobID, Integer> lastSeenEventMapOnRestart = 
+    new HashMap<JobID, Integer>();
+  
   // Number of resolved entries
   int numResolved;
     
@@ -472,6 +981,7 @@
 
   CompletedJobStatusStore completedJobStatusStore = null;
   Thread completedJobsStoreThread = null;
+  RecoveryManager recoveryManager;
 
   /**
    * It might seem like a bug to maintain a TreeSet of status objects,
@@ -548,7 +1058,6 @@
     this.port = addr.getPort();
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
     this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
-    this.interTrackerServer.start();
     if (LOG.isDebugEnabled()) {
       Properties p = System.getProperties();
       for (Iterator it = p.keySet().iterator(); it.hasNext();) {
@@ -582,8 +1091,7 @@
     }
     infoServer.start();
     
-    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
-    trackerIdentifier = dateFormat.format(new Date());
+    trackerIdentifier = getDateFormat().format(new Date());
 
     Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
     try {
@@ -608,6 +1116,9 @@
         infoBindAddress + ":" + this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
     
+    // start the recovery manager
+    recoveryManager = new RecoveryManager();
+    
     while (true) {
       try {
         // if we haven't contacted the namenode go ahead and do it
@@ -619,6 +1130,24 @@
         if(systemDir == null) {
           systemDir = new Path(getSystemDir());    
         }
+        // Make sure that the backup data is preserved
+        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
+        LOG.info("Cleaning up the system directory");
+        // Check if the history is enabled .. as we cant have persistence with 
+        // history disabled
+        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
+            && !JobHistory.isDisableHistory()
+            && systemDirData != null) {
+          for (FileStatus status : systemDirData) {
+            recoveryManager.checkAndAddJob(status);
+          }
+          
+          // Check if there are jobs to be recovered
+          hasRestarted = recoveryManager.shouldRecover();
+          if (hasRestarted) {
+            break; // if there is something to recover else clean the sys dir
+          }
+        }
         fs.delete(systemDir, true);
         if (FileSystem.mkdirs(fs, systemDir, 
             new FsPermission(SYSTEM_DIR_PERMISSION))) {
@@ -653,14 +1182,54 @@
             DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
-    synchronized (this) {
-      state = State.RUNNING;
-    }
 
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+  }
 
-    LOG.info("Starting RUNNING");
+  private static SimpleDateFormat getDateFormat() {
+    return new SimpleDateFormat("yyyyMMddHHmm");
+  }
+
+  static boolean validateIdentifier(String id) {
+    try {
+      // the jobtracker id should be 'date' parseable
+      getDateFormat().parse(id);
+      return true;
+    } catch (ParseException pe) {}
+    return false;
+  }
+
+  static boolean validateJobNumber(String id) {
+    try {
+      // the job number should be integer parseable
+      Integer.parseInt(id);
+      return true;
+    } catch (IllegalArgumentException pe) {}
+    return false;
+  }
+
+  /**
+   * Whether the JT has restarted
+   */
+  public boolean hasRestarted() {
+    return hasRestarted;
+  }
+
+  /**
+   * Whether the JT has recovered upon restart
+   */
+  public boolean hasRecovered() {
+    return hasRecovered;
+  }
+
+  /**
+   * How long the jobtracker took to recover from restart.
+   */
+  public long getRecoveryDuration() {
+    return hasRestarted() 
+           ? recoveryDuration
+           : 0;
   }
 
   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -683,12 +1252,16 @@
    * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
+    taskScheduler.start();
+    
+    //  Start the recovery after starting the scheduler
+    recoveryManager.recover();
+    
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
-    taskScheduler.start();
     expireLaunchingTaskThread.start();
 
     if (completedJobStatusStore.isActive()) {
@@ -697,6 +1270,14 @@
       completedJobsStoreThread.start();
     }
 
+    // start the inter-tracker server once the jt is ready
+    this.interTrackerServer.start();
+    
+    synchronized (this) {
+      state = State.RUNNING;
+    }
+    LOG.info("Starting RUNNING");
+    
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
@@ -778,6 +1359,13 @@
 
     // taskid --> TIP
     taskidToTIPMap.put(taskid, tip);
+    
+    // Note this launch
+    if (taskid.isMap()) {
+      myInstrumentation.launchMap(taskid);
+    } else {
+      myInstrumentation.launchReduce(taskid);
+    }
   }
     
   void removeTaskEntry(TaskAttemptID taskid) {
@@ -908,6 +1496,15 @@
     
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
+    // start the merge of log files
+    JobID id = job.getStatus().getJobID();
+    try {
+      JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
+    } catch (IOException ioe) {
+      LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
+    }
+    
+    
     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
     // in memory; information about the purged jobs is available via
     // JobHistory.
@@ -1055,6 +1652,21 @@
     }
   }
 
+  /**
+   * Adds a new node to the jobtracker. It involves adding it to the expiry
+   * thread and adding it for resolution
+   * @param status Task Tracker's status
+   * @param resolveInline Should the resolution happen inline?
+   */
+  private void addNewTracker(TaskTrackerStatus status) {
+    trackerExpiryQueue.add(status);
+    //  Register the tracker if its not registered
+    if (getNode(status.getTrackerName()) == null) {
+      // Making the network location resolution inline .. 
+      resolveAndAddToTopology(status.getHost());
+    }
+  }
+
   public Node resolveAndAddToTopology(String name) {
     List <String> tmpList = new ArrayList<String>(1);
     tmpList.add(name);
@@ -1168,6 +1780,7 @@
     
     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
+    boolean addRestartInfo = false;
 
     if (initialContact != true) {
       // If this isn't the 'initial contact' from the tasktracker,
@@ -1175,32 +1788,34 @@
       // no record of the 'previous heartbeat'; if so, ask the 
       // tasktracker to re-initialize itself.
       if (prevHeartbeatResponse == null) {
-        LOG.warn("Serious problem, cannot find record of 'previous' " +
-                 "heartbeat for '" + trackerName + 
-                 "'; reinitializing the tasktracker");
-        return new HeartbeatResponse(responseId, 
-                                     new TaskTrackerAction[] {new ReinitTrackerAction()});
+        // This is the first heartbeat from the old tracker to the newly 
+        // started JobTracker
+        if (hasRestarted()) {
+          addRestartInfo = true;
+        } else {
+          // Jobtracker might have restarted but no recovery is needed
+          LOG.warn("Serious problem, cannot find record of 'previous' " +
+                   "heartbeat for '" + trackerName + 
+                   "'; reinitializing the tasktracker");
+          return new HeartbeatResponse(responseId, 
+              new TaskTrackerAction[] {new ReinitTrackerAction()});
+        }
 
-      }
+      } else {
                 
-      // It is completely safe to not process a 'duplicate' heartbeat from a 
-      // {@link TaskTracker} since it resends the heartbeat when rpcs are lost - 
-      // @see {@link TaskTracker.transmitHeartbeat()};
-      // acknowledge it by re-sending the previous response to let the 
-      // {@link TaskTracker} go forward. 
-      if (prevHeartbeatResponse.getResponseId() != responseId) {
-        LOG.info("Ignoring 'duplicate' heartbeat from '" + 
-                 trackerName + "'; resending the previous 'lost' response");
-        return prevHeartbeatResponse;
+        // It is completely safe to not process a 'duplicate' heartbeat from a 
+        // {@link TaskTracker} since it resends the heartbeat when rpcs are 
+        // lost see {@link TaskTracker.transmitHeartbeat()};
+        // acknowledge it by re-sending the previous response to let the 
+        // {@link TaskTracker} go forward. 
+        if (prevHeartbeatResponse.getResponseId() != responseId) {
+          LOG.info("Ignoring 'duplicate' heartbeat from '" + 
+              trackerName + "'; resending the previous 'lost' response");
+          return prevHeartbeatResponse;
+        }
       }
     }
       
-    // Register the tracker if its not registered
-    if (getNode(trackerName) == null) {
-      // Making the network location resolution inline .. 
-      resolveAndAddToTopology(status.getHost());
-    }
-    
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
     if (!processHeartbeat(status, initialContact)) {
@@ -1229,11 +1844,6 @@
         if (tasks != null) {
           for (Task task : tasks) {
             expireLaunchingTasks.addNewTask(task.getTaskID());
-            if (task.isMapTask()) {
-              myInstrumentation.launchMap(task.getTaskID());
-            } else {
-              myInstrumentation.launchReduce(task.getTaskID());
-            }
             LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
             actions.add(new LaunchTaskAction(task));
           }
@@ -1258,6 +1868,11 @@
     response.setHeartbeatInterval(nextInterval);
     response.setActions(
                         actions.toArray(new TaskTrackerAction[actions.size()]));
+    
+    // check if the restart info is req
+    if (addRestartInfo) {
+      response.setLastKnownIndices(lastSeenEventMapOnRestart);
+    }
         
     // Update the trackerToHeartbeatResponseMap
     trackerToHeartbeatResponseMap.put(trackerName, response);
@@ -1388,7 +2003,7 @@
         }
 
         if (initialContact) {
-          trackerExpiryQueue.add(trackerStatus);
+          addNewTracker(trackerStatus);
         }
       }
     }
@@ -1527,19 +2142,9 @@
   ////////////////////////////////////////////////////
 
   /**
-   * Make sure the JobTracker is done initializing.
-   */
-  private synchronized void ensureRunning() throws IllegalStateException {
-    if (state != State.RUNNING) {
-      throw new IllegalStateException("Job tracker still initializing");
-    }
-  }
-
-  /**
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
-    ensureRunning();
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
@@ -1552,14 +2157,23 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
-    ensureRunning();
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
     
-    totalSubmissions++;
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
+   return addJob(jobId, job); 
+  }
+
+  /**
+   * Adds a job to the jobtracker. Make sure that the checks are inplace before
+   * adding a job. This is the core job submission logic
+   * @param jobId The id for the job submitted which needs to be added
+   */
+  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) 
+  throws IOException {
+    totalSubmissions++;
     checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
 
     synchronized (jobs) {
@@ -1885,11 +2499,19 @@
       report.setTaskTracker(trackerName);
       TaskAttemptID taskId = report.getTaskID();
       TaskInProgress tip = taskidToTIPMap.get(taskId);
-      if (tip == null) {
-        LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskID());
-      } else {
+      // Check if the tip is known to the jobtracker. In case of a restarted
+      // jt, some tasks might join in later
+      if (tip != null || hasRestarted()) {
+        if (tip == null) {
+          JobInProgress job = getJob(taskId.getJobID());
+          tip = job.getTaskInProgress(taskId.getTaskID());
+          job.addRunningTaskToTIP(tip, taskId, status, false);
+        }
         expireLaunchingTasks.removeTask(taskId);
         tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
+      } else {
+        LOG.info("Serious problem.  While updating status, cannot find taskid " 
+                 + report.getTaskID());
       }
       
       // Process 'failed fetch' notifications 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Tue Sep 16 05:05:18 2008
@@ -104,6 +104,7 @@
         InputSplit[] splits;
         splits = job.getInputFormat().getSplits(job, 1);
         JobID jobId = profile.getJobID();
+        long timstamp = System.currentTimeMillis();
         
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {
@@ -116,7 +117,8 @@
         
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
-          TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i), 0);  
+          TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i), 
+                                                  0, timstamp);  
           mapIds.add(mapId);
           buffer.reset();
           splits[i].write(buffer);
@@ -137,7 +139,8 @@
           map_tasks -= 1;
           updateCounters(map);
         }
-        TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, false, 0), 0);
+        TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, false, 0),
+                                                   0, timstamp);
         try {
           if (numReduceTasks > 0) {
             // move map output to reduce input  
@@ -277,9 +280,10 @@
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
     
-    public TaskCompletionEvent[] getMapCompletionEvents(JobID jobId
-        , int fromEventId, int maxLocs) throws IOException {
-      return TaskCompletionEvent.EMPTY_ARRAY;
+    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
+        int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
+      return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
+                                               false);
     }
     
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Sep 16 05:05:18 2008
@@ -87,7 +87,7 @@
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
                  int partition, String splitClass, BytesWritable split
-                 ) throws IOException {
+                 ) {
     super(jobFile, taskId, partition);
     this.splitClass = splitClass;
     this.split = split;

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java?rev=695823&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java Tue Sep 16 05:05:18 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class that represents the communication between the tasktracker and child
+ * tasks w.r.t the map task completion events. It also indicates whether the 
+ * child task should reset its events index.
+ */
+class MapTaskCompletionEventsUpdate implements Writable {
+  TaskCompletionEvent[] events;
+  boolean reset;
+
+  public MapTaskCompletionEventsUpdate() { }
+
+  public MapTaskCompletionEventsUpdate(TaskCompletionEvent[] events,
+      boolean reset) {
+    this.events = events;
+    this.reset = reset;
+  }
+
+  public boolean shouldReset() {
+    return reset;
+  }
+
+  public TaskCompletionEvent[] getMapTaskCompletionEvents() {
+    return events;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(reset);
+    out.writeInt(events.length);
+    for (TaskCompletionEvent event : events) {
+      event.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    reset = in.readBoolean();
+    events = new TaskCompletionEvent[in.readInt()];
+    for (int i = 0; i < events.length; ++i) {
+      events[i] = new TaskCompletionEvent();
+      events[i].readFields(in);
+    }
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 16 05:05:18 2008
@@ -2236,9 +2236,20 @@
         currentTime = System.currentTimeMillis();
       }
       
-      TaskCompletionEvent events[] = 
+      MapTaskCompletionEventsUpdate update = 
         umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
-                                       fromEventId.get(), MAX_EVENTS_TO_FETCH);
+                                         fromEventId.get(), MAX_EVENTS_TO_FETCH,
+                                         reduceTask.getTaskID());
+      TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
+        
+      // Check if the reset is required.
+      // Since there is no ordering of the task completion events at the 
+      // reducer, the only option to sync with the new jobtracker is to reset 
+      // the events index
+      if (update.shouldReset()) {
+        fromEventId.set(0);
+        obsoleteMapIds.clear(); // clear the obsolete map
+      }
       
       // Note the last successful poll time-stamp
       lastPollTime = currentTime;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptID.java Tue Sep 16 05:05:18 2008
@@ -22,18 +22,28 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.WritableUtils;
+
 /**
  * TaskAttemptID represents the immutable and unique identifier for 
  * a task attempt. Each task attempt is one particular instance of a Map or
  * Reduce Task identified by its TaskID. 
  * 
- * TaskAttemptID consists of 2 parts. First part is the 
+ * TaskAttemptID consists of 3 parts. First part is the 
  * {@link TaskID}, that this TaskAttemptID belongs to.
- * Second part is the task attempt number. <br> 
+ * Second part is the task attempt number. Third part is the unique identifier
+ * for distinguishing tasks-attempts across jobtracker restarts.<br> 
  * An example TaskAttemptID is : 
- * <code>attempt_200707121733_0003_m_000005_0</code> , which represents the
- * zeroth task attempt for the fifth map task in the third job 
- * running at the jobtracker started at <code>200707121733</code>. 
+ * <code>attempt_200707121733_0003_m_000005_0_1234567890123</code> , which 
+ * represents the zeroth task attempt for the fifth map task in the third job 
+ * running at the jobtracker started at <code>200707121733</code> with 
+ * timestamp <code>1234567890123</code>. There could be another attempt with id
+ * <code>attempt_200707121733_0003_m_000005_0_1234567890124</code> which 
+ * indicates that the task was scheduled by the jobtracker started at timestamp
+ * <code>1234567890124</code>. <code>200707121733</code> here indicates that 
+ * the job was started by the jobtracker that was started at 
+ * <code>200707121733</code>, although this task-attempt was scheduled by the 
+ * new jobtracker. 
  * <p>
  * Applications should never construct or parse TaskAttemptID strings
  * , but rather use appropriate constructors or {@link #forName(String)} 
@@ -45,19 +55,39 @@
 public class TaskAttemptID extends ID {
   private static final String ATTEMPT = "attempt";
   private TaskID taskId;
+  private long jtTimestamp = 0;
   private static final char UNDERSCORE = '_';
   
   /**
+   * @deprecated Use {@link #TaskAttemptID(TaskID, int, long)} instead.
+   */
+  public TaskAttemptID(TaskID taskId, int id) {
+    this(taskId, id, 0);
+  }
+  
+  /**
    * Constructs a TaskAttemptID object from given {@link TaskID}.  
    * @param taskId TaskID that this task belongs to  
    * @param id the task attempt number
+   * @param jtTimestamp timestamp that uniquely identifies the task 
+   *        attempt across restarts
    */
-  public TaskAttemptID(TaskID taskId, int id) {
+  public TaskAttemptID(TaskID taskId, int id, long jtTimestamp) {
     super(id);
     if(taskId == null) {
       throw new IllegalArgumentException("taskId cannot be null");
     }
     this.taskId = taskId;
+    this.jtTimestamp = jtTimestamp;
+  }
+  
+  /**
+   * @deprecated 
+   *   Use {@link #TaskAttemptID(String, int, boolean, int, int, long)} instead
+   */
+  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap, 
+                       int taskId, int id) {
+    this(new TaskID(jtIdentifier, jobId, isMap, taskId), id, 0);
   }
   
   /**
@@ -67,10 +97,13 @@
    * @param isMap whether the tip is a map 
    * @param taskId taskId number
    * @param id the task attempt number
+   * @param jtTimestamp timestamp that uniquely identifies the task attempt 
+   *        across restarts
    */
-  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap
-      , int taskId, int id) {
-    this(new TaskID(jtIdentifier, jobId, isMap, taskId), id);
+  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap, 
+                       int taskId, int id, long jtTimestamp) {
+    this(new TaskID(jtIdentifier, jobId, isMap, taskId), id, 
+                    jtTimestamp);
   }
   
   private TaskAttemptID() { }
@@ -97,7 +130,8 @@
     if(o.getClass().equals(TaskAttemptID.class)) {
       TaskAttemptID that = (TaskAttemptID)o;
       return this.id==that.id
-        && this.taskId.equals(that.taskId);
+             && this.taskId.equals(that.taskId) 
+             && this.jtTimestamp == that.jtTimestamp;
     }
     else return false;
   }
@@ -108,9 +142,12 @@
     TaskAttemptID that = (TaskAttemptID)o;
     int tipComp = this.taskId.compareTo(that.taskId);
     if(tipComp == 0) {
-      return this.id - that.id;
+      tipComp = this.id - that.id;
+    }
+    if (tipComp == 0) {
+      tipComp = Long.valueOf(this.jtTimestamp).compareTo(that.jtTimestamp);
     }
-    else return tipComp;
+    return tipComp;
   }
   @Override
   public String toString() { 
@@ -120,9 +157,13 @@
   }
 
   StringBuilder toStringWOPrefix() {
+    // This is just for backward compability.
+    String appendForTimestamp = (jtTimestamp == 0) 
+                                ? "" 
+                                : UNDERSCORE + String.valueOf(jtTimestamp);
     StringBuilder builder = new StringBuilder();
     return builder.append(taskId.toStringWOPrefix())
-      .append(UNDERSCORE).append(id);
+                  .append(UNDERSCORE).append(id).append(appendForTimestamp);
   }
   
   @Override
@@ -134,12 +175,14 @@
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.taskId = TaskID.read(in);
+    this.jtTimestamp = WritableUtils.readVLong(in);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
     taskId.write(out);
+    WritableUtils.writeVLong(out, jtTimestamp);
   }
   
   public static TaskAttemptID read(DataInput in) throws IOException {
@@ -157,14 +200,20 @@
       return null;
     try {
       String[] parts = str.split("_");
-      if(parts.length == 6) {
+      long jtTimestamp = 0;
+      // This is for backward compability
+      if(parts.length == 6 || parts.length == 7) {
         if(parts[0].equals(ATTEMPT)) {
           boolean isMap = false;
           if(parts[3].equals("m")) isMap = true;
           else if(parts[3].equals("r")) isMap = false;
           else throw new Exception();
+          if (parts.length == 7) {
+            jtTimestamp = Long.parseLong(parts[6]);
+          }
           return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
-              isMap, Integer.parseInt(parts[4]), Integer.parseInt(parts[5]));
+                                   isMap, Integer.parseInt(parts[4]), 
+                                   Integer.parseInt(parts[5]), jtTimestamp);
         }
       }
     }catch (Exception ex) {//fall below
@@ -174,6 +223,20 @@
   }
   
   /** 
+   * @return a regex pattern matching TaskAttemptIDs
+   * @deprecated Use {@link #getTaskAttemptIDsPattern(String, Integer, Boolean,
+   *                                                  Integer, Integer, Long)} 
+   *             instead.
+   */
+  public static String getTaskAttemptIDsPattern(String jtIdentifier,
+      Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
+    StringBuilder builder = new StringBuilder(ATTEMPT).append(UNDERSCORE);
+    builder.append(getTaskAttemptIDsPatternWOPrefix(jtIdentifier, jobId,
+                   isMap, taskId, attemptId, null));
+    return builder.toString();
+  }
+  
+  /**
    * Returns a regex pattern which matches task attempt IDs. Arguments can 
    * be given null, in which case that part of the regex will be generic.  
    * For example to obtain a regex matching <i>all task attempt IDs</i> 
@@ -189,16 +252,23 @@
    * @param isMap whether the tip is a map, or null 
    * @param taskId taskId number, or null
    * @param attemptId the task attempt number, or null
-   * @return a regex pattern matching TaskAttemptIDs
+   * @param jtTimestamp Timestamp that is used to identify task attempts across
+   *        jobtracker restarts. Make sure that timestamp has some valid value.
    */
-  public static String getTaskAttemptIDsPattern(String jtIdentifier,
-      Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
+  public static String getTaskAttemptIDsPattern(String jtIdentifier, 
+      Integer jobId, Boolean isMap, Integer taskId, Integer attemptId, Long jtTimestamp) {
     StringBuilder builder = new StringBuilder(ATTEMPT).append(UNDERSCORE);
     builder.append(getTaskAttemptIDsPatternWOPrefix(jtIdentifier, jobId,
-        isMap, taskId, attemptId));
+                   isMap, taskId, attemptId, jtTimestamp));
     return builder.toString();
   }
   
+  /**
+   * @deprecated 
+   * Use {@link #getTaskAttemptIDsPatternWOPrefix(String, Integer, Boolean, 
+   *                                              Integer, Integer, Long)} 
+   * instead.
+   */
   static StringBuilder getTaskAttemptIDsPatternWOPrefix(String jtIdentifier
       , Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
     StringBuilder builder = new StringBuilder();
@@ -209,4 +279,15 @@
     return builder;
   }
   
+  static StringBuilder getTaskAttemptIDsPatternWOPrefix(String jtIdentifier, 
+      Integer jobId, Boolean isMap, Integer taskId, Integer attemptId, 
+      Long jtTimestamp) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(TaskID.getTaskIDsPatternWOPrefix(jtIdentifier, jobId, isMap, taskId))
+           .append(UNDERSCORE)
+           .append(attemptId != null ? attemptId : "[0-9]*")
+           .append(UNDERSCORE)
+           .append(jtTimestamp != null ? jtTimestamp : "[0-9]*");
+    return builder;
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Sep 16 05:05:18 2008
@@ -177,6 +177,28 @@
     return buf.toString();
   }
     
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(TaskCompletionEvent.class)) {
+      TaskCompletionEvent event = (TaskCompletionEvent) o;
+      return this.isMap == event.isMapTask() 
+             && this.eventId == event.getEventId()
+             && this.idWithinJob == event.idWithinJob() 
+             && this.status.equals(event.getTaskStatus())
+             && this.taskId.equals(event.getTaskAttemptId()) 
+             && this.taskRunTime == event.getTaskRunTime()
+             && this.taskTrackerHttp.equals(event.getTaskTrackerHttp());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode(); 
+  }
+
   public boolean isMapTask() {
     return isMap;
   }
@@ -194,6 +216,7 @@
     WritableUtils.writeEnum(out, status); 
     WritableUtils.writeString(out, taskTrackerHttp);
     WritableUtils.writeVInt(out, taskRunTime);
+    WritableUtils.writeVInt(out, eventId);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -203,5 +226,6 @@
     this.status = WritableUtils.readEnum(in, Status.class);
     this.taskTrackerHttp = WritableUtils.readString(in);
     this.taskRunTime = WritableUtils.readVInt(in);
+    this.eventId = WritableUtils.readVInt(in);
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Tue Sep 16 05:05:18 2008
@@ -207,6 +207,41 @@
   ////////////////////////////////////
 
   /**
+   * Return the start time
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+  
+  /**
+   * Return the exec start time
+   */
+  public long getExecStartTime() {
+    return execStartTime;
+  }
+  
+  /**
+   * Set the exec start time
+   */
+  public void setExecStartTime(long startTime) {
+    execStartTime = startTime;
+  }
+  
+  /**
+   * Return the exec finish time
+   */
+  public long getExecFinishTime() {
+    return execFinishTime;
+  }
+
+  /**
+   * Set the exec finish time
+   */
+  public void setExecFinishTime(long finishTime) {
+    execFinishTime = finishTime;
+  }
+  
+  /**
    * Return the parent job
    */
   public JobInProgress getJob() {
@@ -366,7 +401,7 @@
    * Returns whether the task attempt should be committed or not 
    */
   public boolean shouldCommit(TaskAttemptID taskid) {
-    return taskToCommit.equals(taskid);
+    return !isComplete() && taskToCommit.equals(taskid);
   }
 
   /**
@@ -510,7 +545,9 @@
       }
     }
 
-    this.activeTasks.remove(taskid);
+    // Note that there can be failures of tasks that are hosted on a machine 
+    // that has not yet registered with restarted jobtracker
+    boolean isPresent = this.activeTasks.remove(taskid) != null;
     
     // Since we do not fail completed reduces (whose outputs go to hdfs), we 
     // should note this failure only for completed maps, only if this taskid;
@@ -524,15 +561,17 @@
       resetSuccessfulTaskid();
     }
 
-
-    if (taskState == TaskStatus.State.FAILED) {
-      numTaskFailures++;
-      machinesWhereFailed.add(trackerHostName);
-      LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
-      failedRanges.add(status.getNextRecordRange());
-      skipping = startSkipping();
-    } else {
-      numKilledTasks++;
+    // recalculate the counts only if its a genuine failure
+    if (isPresent) {
+      if (taskState == TaskStatus.State.FAILED) {
+        numTaskFailures++;
+        machinesWhereFailed.add(trackerHostName);
+        LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
+        failedRanges.add(status.getNextRecordRange());
+        skipping = startSkipping();
+      } else {
+        numKilledTasks++;
+      }
     }
 
     if (numTaskFailures >= maxTaskAttempts) {
@@ -602,6 +641,7 @@
     //
 
     this.completes++;
+    this.execFinishTime = System.currentTimeMillis();
     recomputeProgress();
     
   }
@@ -637,6 +677,7 @@
     }
     this.failed = true;
     killed = true;
+    this.execFinishTime = System.currentTimeMillis();
     recomputeProgress();
   }
 
@@ -674,10 +715,15 @@
   void recomputeProgress() {
     if (isComplete()) {
       this.progress = 1;
-      this.execFinishTime = System.currentTimeMillis();
+      // update the counters and the state
+      TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid());
+      this.counters = completedStatus.getCounters();
+      this.state = completedStatus.getStateString();
     } else if (failed) {
       this.progress = 0;
-      this.execFinishTime = System.currentTimeMillis();
+      // reset the counters and the state
+      this.state = "";
+      this.counters = new Counters();
     } else {
       double bestProgress = 0;
       String bestState = "";
@@ -747,7 +793,6 @@
    * Return a Task that can be sent to a TaskTracker for execution.
    */
   public Task getTaskToRun(String taskTracker) throws IOException {
-    Task t = null;
     if (0 == execStartTime){
       // assume task starts running now
       execStartTime = System.currentTimeMillis();
@@ -756,7 +801,7 @@
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
-      taskid = new TaskAttemptID( id, nextTaskId);
+      taskid = new TaskAttemptID( id, nextTaskId, jobtracker.getStartTime());
       ++nextTaskId;
     } else {
       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
@@ -765,6 +810,16 @@
       return null;
     }
 
+    return addRunningTask(taskid, taskTracker);
+  }
+  
+  /**
+   * Adds a previously running task to this tip. This is used in case of 
+   * jobtracker restarts.
+   */
+  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+    // create the task
+    Task t = null;
     if (isMapTask()) {
       LOG.debug("attemdpt "+  numTaskFailures   +
           " sending skippedRecords "+failedRanges.getIndicesCount());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java Tue Sep 16 05:05:18 2008
@@ -20,6 +20,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -93,6 +94,31 @@
   void setStartTime(long startTime) {
     this.startTime = startTime;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(TaskReport.class)) {
+      TaskReport report = (TaskReport) o;
+      return counters.contentEquals(report.getCounters())
+             && Arrays.toString(this.diagnostics)
+                      .equals(Arrays.toString(report.getDiagnostics()))
+             && this.finishTime == report.getFinishTime()
+             && this.progress == report.getProgress()
+             && this.startTime == report.getStartTime()
+             && this.state.equals(report.getState())
+             && this.taskid.equals(report.getTaskID());
+    }
+    return false; 
+  }
+
+  @Override
+  public int hashCode() {
+    return (counters.toString() + Arrays.toString(this.diagnostics) 
+            + this.finishTime + this.progress + this.startTime + this.state 
+            + this.taskid.toString()).hashCode();
+  }
   //////////////////////////////////////////////
   // Writable
   //////////////////////////////////////////////

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 16 05:05:18 2008
@@ -66,6 +66,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
@@ -154,6 +155,8 @@
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
+  // Mark reduce tasks that are shuffling to rollback their events index
+  Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>();
     
   //dir -> DF
   Map<String, DF> localDirsDf = new HashMap<String, DF>();
@@ -598,6 +601,23 @@
       this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
     }
       
+    /**
+     * Check if the number of events that are obtained are more than required.
+     * If yes then purge the extra ones.
+     */
+    public void purgeMapEvents(int lastKnownIndex) {
+      // Note that the sync is first on fromEventId and then on allMapEvents
+      synchronized (fromEventId) {
+        synchronized (allMapEvents) {
+          int index = 0;
+          if (allMapEvents.size() > lastKnownIndex) {
+            fromEventId.set(lastKnownIndex);
+            allMapEvents = allMapEvents.subList(0, lastKnownIndex);
+          }
+        }
+      }
+    }
+    
     public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
         
       TaskCompletionEvent[] mapEvents = 
@@ -626,19 +646,22 @@
       if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) {
         return false;
       }
-      int currFromEventId = fromEventId.get();
-      List <TaskCompletionEvent> recentMapEvents = 
-        queryJobTracker(fromEventId, jobId, jobClient);
-      synchronized (allMapEvents) {
-        allMapEvents.addAll(recentMapEvents);
-      }
-      lastFetchTime = currTime;
-      if (fromEventId.get() - currFromEventId >= probe_sample_size) {
-        //return true when we have fetched the full payload, indicating
-        //that we should fetch again immediately (there might be more to
-        //fetch
-        fetchAgain = true;
-        return true;
+      int currFromEventId = 0;
+      synchronized (fromEventId) {
+        currFromEventId = fromEventId.get();
+        List <TaskCompletionEvent> recentMapEvents = 
+          queryJobTracker(fromEventId, jobId, jobClient);
+        synchronized (allMapEvents) {
+          allMapEvents.addAll(recentMapEvents);
+        }
+        lastFetchTime = currTime;
+        if (fromEventId.get() - currFromEventId >= probe_sample_size) {
+          //return true when we have fetched the full payload, indicating
+          //that we should fetch again immediately (there might be more to
+          //fetch
+          fetchAgain = true;
+          return true;
+        }
       }
       fetchAgain = false;
       return false;
@@ -965,6 +988,39 @@
         // next heartbeat   
         lastHeartbeat = System.currentTimeMillis();
         
+        
+        // Check if the map-event list needs purging
+        if (heartbeatResponse.getLastKnownIndex() != null) {
+          synchronized (this) {
+            // purge the local map events list
+            for (Map.Entry<JobID, Integer> entry 
+                 : heartbeatResponse.getLastKnownIndex().entrySet()) {
+              RunningJob rjob;
+              synchronized (runningJobs) {
+                rjob = runningJobs.get(entry.getKey());          
+                if (rjob != null) {
+                  synchronized (rjob) {
+                    FetchStatus f = rjob.getFetchStatus();
+                    if (f != null) {
+                      f.purgeMapEvents(entry.getValue());
+                    }
+                  }
+                }
+              }
+            }
+
+            // Mark the reducers in shuffle for rollback
+            synchronized (shouldReset) {
+              for (Map.Entry<TaskAttemptID, TaskInProgress> entry 
+                   : runningTasks.entrySet()) {
+                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
+                  this.shouldReset.add(entry.getKey());
+                }
+              }
+            }
+          }
+        }
+        
         TaskTrackerAction[] actions = heartbeatResponse.getActions();
         if(LOG.isDebugEnabled()) {
           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
@@ -2227,10 +2283,15 @@
     purgeTask(tip, true);
   }
 
-  public TaskCompletionEvent[] getMapCompletionEvents(JobID jobId
-      , int fromEventId, int maxLocs) throws IOException {
-      
+  public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
+      JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) 
+  throws IOException {
     TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
+    synchronized (shouldReset) {
+      if (shouldReset.remove(id)) {
+        return new MapTaskCompletionEventsUpdate(mapEvents, true);
+      }
+    }
     RunningJob rjob;
     synchronized (runningJobs) {
       rjob = runningJobs.get(jobId);          
@@ -2243,7 +2304,7 @@
         }
       }
     }
-    return mapEvents;
+    return new MapTaskCompletionEventsUpdate(mapEvents, false);
   }
     
   /////////////////////////////////////////////////////

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Sep 16 05:05:18 2008
@@ -46,6 +46,9 @@
    * Version 10 changed the TaskStatus format and added reportNextRecordRange
    *            for HADOOP-153
    * Version 11 Adds RPCs for task commit as part of HADOOP-3150
+   * Version 12 getMapCompletionEvents() now also indicates if the events are 
+   *            stale or not. Hence the return type is a class that 
+   *            encapsulates the events and whether to reset events index.
    * */
 
   public static final long versionID = 11L;
@@ -117,14 +120,22 @@
   void fsError(TaskAttemptID taskId, String message) throws IOException;
 
   /** Called by a reduce task to get the map output locations for finished maps.
+   * Returns an update centered around the map-task-completion-events. 
+   * The update also piggybacks the information whether the events copy at the 
+   * task-tracker has changed or not. This will trigger some action at the 
+   * child-process.
    *
    * @param taskId the reduce task id
    * @param fromIndex the index starting from which the locations should be 
    * fetched
    * @param maxLocs the max number of locations to fetch
-   * @return an array of TaskCompletionEvent
+   * @param id The attempt id of the task that is trying to communicate
+   * @return A {@link MapTaskCompletionEventsUpdate} 
    */
-  TaskCompletionEvent[] getMapCompletionEvents(JobID jobId, 
-                                               int fromIndex, int maxLocs) throws IOException;
+  MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
+                                                       int fromIndex, 
+                                                       int maxLocs,
+                                                       TaskAttemptID id) 
+  throws IOException;
 
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Sep 16 05:05:18 2008
@@ -52,6 +52,7 @@
     
   private String namenode;
   private UnixUserGroupInformation ugi = null;
+  private JobConf conf;
     
   private JobConf job;
   
@@ -83,6 +84,10 @@
       return tracker;
     }
     
+    public TaskAttemptID getTaskAttemptId(TaskID taskId, int attemptId) {
+      return new TaskAttemptID(taskId, attemptId, tracker.getStartTime());
+    }
+
     /**
      * Create the job tracker and run it.
      */
@@ -296,6 +301,10 @@
     return createJobConf(new JobConf());
   }
 
+  public TaskAttemptID getTaskAttemptId(TaskID taskId, int attemptId) {
+    return this.jobTracker.getTaskAttemptId(taskId, attemptId);
+  }
+
   public JobConf createJobConf(JobConf conf) {
     if(conf == null) {
       conf = new JobConf();
@@ -430,23 +439,10 @@
     this.numTaskTrackers = numTaskTrackers;
     this.namenode = namenode;
     this.ugi = ugi;
-    
-    // Create the JobTracker
-    jobTracker = new JobTrackerRunner(conf);
-    jobTrackerThread = new Thread(jobTracker);
-        
-    jobTrackerThread.start();
-    while (!jobTracker.isUp()) {
-      try {                                     // let daemons get started
-        LOG.info("Waiting for JobTracker to start...");
-        Thread.sleep(1000);
-      } catch(InterruptedException e) {
-      }
-    }
-        
-    // Set the configuration for the task-trackers
-    this.jobTrackerPort = jobTracker.getJobTrackerPort();
-    this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
+    this.conf = conf; // this is the conf the mr starts with
+
+    // start the jobtracker
+    startJobTracker();
 
     // Create the TaskTrackers
     for (int idx = 0; idx < numTaskTrackers; idx++) {
@@ -476,6 +472,126 @@
   }
     
   /**
+   * Get the map task completion events
+   */
+  public TaskCompletionEvent[] getMapTaskCompletionEvents(JobID id, int from, 
+                                                          int max) 
+  throws IOException {
+    return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);
+  }
+
+  /**
+   * Change the job's priority
+   */
+  public void setJobPriority(JobID jobId, JobPriority priority) {
+    jobTracker.getJobTracker().setJobPriority(jobId, priority);
+  }
+
+  /**
+   * Get the job's priority
+   */
+  public JobPriority getJobPriority(JobID jobId) {
+    return jobTracker.getJobTracker().getJob(jobId).getPriority();
+  }
+
+  /**
+   * Get the job finish time
+   */
+  public long getJobFinishTime(JobID jobId) {
+    return jobTracker.getJobTracker().getJob(jobId).getFinishTime();
+  }
+
+  /**
+   * Init the job
+   */
+  public void initializeJob(JobID jobId) throws IOException {
+    JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
+    job.initTasks();
+  }
+  
+  /**
+   * Get the events list at the tasktracker
+   */
+  public MapTaskCompletionEventsUpdate 
+         getMapTaskCompletionEventsUpdates(int index, JobID jobId, int max) 
+  throws IOException {
+    String jtId = jobTracker.getJobTracker().getTrackerIdentifier();
+    long jtStart = jobTracker.getJobTracker().getStartTime();
+    TaskAttemptID dummy = 
+      new TaskAttemptID(jtId, jobId.getId(), false, 0, 0, jtStart);
+    return taskTrackerList.get(index).getTaskTracker()
+                                     .getMapCompletionEvents(jobId, 0, max, 
+                                                             dummy);
+  }
+  
+  /**
+   * Get jobtracker conf
+   */
+  public JobConf getJobTrackerConf() {
+    return this.conf;
+  }
+  
+  /**
+   * Get num events recovered
+   */
+  public int getNumEventsRecovered() {
+    return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered();
+  }
+  
+  /**
+   * Start the jobtracker.
+   */
+  public void startJobTracker() {
+    //  Create the JobTracker
+    jobTracker = new JobTrackerRunner(conf);
+    jobTrackerThread = new Thread(jobTracker);
+        
+    jobTrackerThread.start();
+    while (!jobTracker.isUp()) {
+      try {                                     // let daemons get started
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
+      }
+    }
+        
+    // Set the configuration for the task-trackers
+    this.jobTrackerPort = jobTracker.getJobTrackerPort();
+    this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
+  }
+
+  /**
+   * Kill the jobtracker.
+   */
+  public void stopJobTracker() {
+    //jobTracker.exit(-1);
+    jobTracker.shutdown();
+
+    jobTrackerThread.interrupt();
+    try {
+      jobTrackerThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Problem waiting for job tracker to finish", ex);
+    }
+  }
+
+  /**
+   * Kill the tasktracker.
+   */
+  public void stopTaskTracker(int id) {
+    taskTrackerList.get(id).shutdown();
+
+    taskTrackerThreadList.get(id).interrupt();
+    
+    try {
+      taskTrackerThreadList.get(id).join();
+      // This will break the wait until idle loop
+      taskTrackerList.get(id).isDead = true;
+    } catch (InterruptedException ex) {
+      LOG.error("Problem waiting for task tracker to finish", ex);
+    }
+  }
+  
+  /**
    * Shut down the servers.
    */
   public void shutdown() {
@@ -492,13 +608,7 @@
           LOG.error("Problem shutting down task tracker", ex);
         }
       }
-      jobTracker.shutdown();
-      jobTrackerThread.interrupt();
-      try {
-        jobTrackerThread.join();
-      } catch (InterruptedException ex) {
-        LOG.error("Problem waiting for job tracker to finish", ex);
-      }
+      stopJobTracker();
     } finally {
       File configDir = new File("build", "minimr");
       File siteFile = new File(configDir, "hadoop-site.xml");

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java?rev=695823&r1=695822&r2=695823&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java Tue Sep 16 05:05:18 2008
@@ -21,9 +21,6 @@
 import java.io.IOException;
 import java.text.ParseException;
 
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
-
 /**
  * TestCounters checks the sanity and recoverability of {@code Counters}
  */
@@ -65,54 +62,10 @@
       Counters.fromEscapedCompactString(compactEscapedString);
     // Check for recovery from string
     assertTrue("Recovered counter does not match on content", 
-               compareCounters(counter, recoveredCounter));
+               counter.contentEquals(recoveredCounter));
     
   }
   
-  // Checks for (content) equality of two Counter
-  private boolean compareCounter(Counter c1, Counter c2) {
-    return c1.getName().equals(c2.getName())
-           && c1.getDisplayName().equals(c2.getDisplayName())
-           && c1.getCounter() == c2.getCounter();
-  }
-  
-  // Checks for (content) equality of Groups
-  private boolean compareGroup(Group g1, Group g2) {
-    boolean isEqual = false;
-    if (g1 != null && g2 != null) {
-      if (g1.size() == g2.size()) {
-        isEqual = true;
-        for (String cname : g1.getCounterNames()) {
-          Counter c1 = g1.getCounterForName(cname);
-          Counter c2 = g2.getCounterForName(cname);
-          if (!compareCounter(c1, c2)) {
-            isEqual = false;
-            break;
-          }
-        }
-      }
-    }
-    return isEqual;
-  }
-  
-  // Checks for (content) equality of Counters
-  private boolean compareCounters(Counters c1, Counters c2) {
-    boolean isEqual = false;
-    if (c1 != null && c2 != null) {
-      if (c1.size() == c2.size()) {
-        isEqual = true;
-        for (Group g1 : c1) {
-          Group g2 = c2.getGroup(g1.getName());
-          if (!compareGroup(g1, g2)) {
-            isEqual = false;
-            break;
-          }
-        }
-      }
-    }
-    return isEqual;
-  }
-  
   public void testCounters() throws IOException {
     Enum[] keysWithResource = {Task.FileSystemCounter.HDFS_READ, 
                                Task.Counter.MAP_INPUT_BYTES, 



Mime
View raw message