hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r748045 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobStatus.java src/mapred/org/apache/hadoop/mapred/JobTracker.java
Date Thu, 26 Feb 2009 07:17:25 GMT
Author: ddas
Date: Thu Feb 26 07:17:24 2009
New Revision: 748045

URL: http://svn.apache.org/viewvc?rev=748045&view=rev
Log:
HADOOP-5247. Committing this to the 0.19 branch.

Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=748045&r1=748044&r2=748045&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Feb 26 07:17:24 2009
@@ -21,6 +21,11 @@
     (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235).
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when
+    a job finishes. This fixes a bunch of problems to do with NPE when a completed
+    job is not in memory and a tasktracker comes to the jobtracker with a status
+    report of a task belonging to that job. (Amar Kamat via ddas)
+
 Release 0.19.1 - 2009-02-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=748045&r1=748044&r2=748045&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Thu Feb 26 07:17:24 2009
@@ -2480,9 +2480,6 @@
   }
 
   boolean isComplete() {
-    int runState = this.status.getRunState();
-    return runState == JobStatus.SUCCEEDED 
-           || runState == JobStatus.FAILED 
-           || runState == JobStatus.KILLED;
+    return status.isJobComplete();
   }
 }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=748045&r1=748044&r2=748045&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java Thu
Feb 26 07:17:24 2009
@@ -270,6 +270,14 @@
      priority = jp;
    }
   
+   /**
+    * Returns true if the status is for a completed job.
+    */
+   public synchronized boolean isJobComplete() {
+     return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED 
+             || runState == JobStatus.KILLED);
+   }
+
   ///////////////////////////////////////
   // Writable
   ///////////////////////////////////////

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=748045&r1=748044&r2=748045&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu
Feb 26 07:17:24 2009
@@ -960,6 +960,10 @@
   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
     new TreeMap<String, ArrayList<JobInProgress>>();
     
+  // (trackerID --> list of jobs to cleanup)
+  Map<String, Set<JobID>> trackerToJobsToCleanup = 
+    new HashMap<String, Set<JobID>>();
+  
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -1552,7 +1556,10 @@
     }
 
     long now = System.currentTimeMillis();
-    
+
+    // mark the job for cleanup at all the trackers
+    addJobForCleanup(id);
+
     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given
user
     // in memory; information about the purged jobs is available via
     // JobHistory.
@@ -1919,6 +1926,12 @@
       actions.addAll(killTasksList);
     }
      
+    // Check for jobs to be killed/cleanedup
+    List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
+    if (killJobsList != null) {
+      actions.addAll(killJobsList);
+    }
+
     // Check for tasks whose outputs can be saved
     List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
     if (commitTasksList != null) {
@@ -2085,27 +2098,58 @@
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
     if (taskIds != null) {
       List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
-      Set<JobID> killJobIds = new TreeSet<JobID>(); 
       for (TaskAttemptID killTaskId : taskIds) {
         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
+        if (tip == null) {
+          continue;
+        }
         if (tip.shouldClose(killTaskId)) {
           // 
           // This is how the JobTracker ends a task at the TaskTracker.
           // It may be successfully completed, or may be killed in
           // mid-execution.
           //
-          if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING ||
-              tip.getJob().getStatus().getRunState() == JobStatus.PREP) {
+          if (!tip.getJob().isComplete()) {
             killList.add(new KillTaskAction(killTaskId));
             LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
-          } else {
-            JobID killJobId = tip.getJob().getStatus().getJobID(); 
-            killJobIds.add(killJobId);
           }
         }
       }
             
-      for (JobID killJobId : killJobIds) {
+      return killList;
+    }
+    return null;
+  }
+
+  /**
+   * Add a job to cleanup for the tracker.
+   */
+  private void addJobForCleanup(JobID id) {
+    for (String taskTracker : taskTrackers.keySet()) {
+      LOG.debug("Marking job " + id + " for cleanup by tracker " + taskTracker);
+      synchronized (trackerToJobsToCleanup) {
+        Set<JobID> jobsToKill = trackerToJobsToCleanup.get(taskTracker);
+        if (jobsToKill == null) {
+          jobsToKill = new HashSet<JobID>();
+          trackerToJobsToCleanup.put(taskTracker, jobsToKill);
+        }
+        jobsToKill.add(id);
+      }
+    }
+  }
+  
+  /**
+   * A tracker wants to know if any job needs cleanup because the job completed.
+   */
+  private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) {
+    Set<JobID> jobs = null;
+    synchronized (trackerToJobsToCleanup) {
+      jobs = trackerToJobsToCleanup.remove(taskTracker);
+    }
+    if (jobs != null) {
+      // prepare the actions list
+      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
+      for (JobID killJobId : jobs) {
         killList.add(new KillJobAction(killJobId));
         LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
       }
@@ -2127,6 +2171,9 @@
         if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
           TaskAttemptID taskId = taskStatus.getTaskID();
           TaskInProgress tip = taskidToTIPMap.get(taskId);
+          if (tip == null) {
+            continue;
+          }
           if (tip.shouldCommit(taskId)) {
             saveList.add(new CommitTaskAction(taskId));
             LOG.debug(tts.getTrackerName() + 
@@ -2654,16 +2701,23 @@
     for (TaskStatus report : status.getTaskReports()) {
       report.setTaskTracker(trackerName);
       TaskAttemptID taskId = report.getTaskID();
+      
+      // expire it
+      expireLaunchingTasks.removeTask(taskId);
+      
+      JobInProgress job = getJob(taskId.getJobID());
+      if (job == null) {
+        continue;
+      }
+      
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
       if (tip != null || hasRestarted()) {
-        JobInProgress job = getJob(taskId.getJobID());
         if (tip == null) {
           tip = job.getTaskInProgress(taskId.getTaskID());
           job.addRunningTaskToTIP(tip, taskId, status, false);
         }
-        expireLaunchingTasks.removeTask(taskId);
         
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
@@ -2711,6 +2765,12 @@
    */
   void lostTaskTracker(String trackerName) {
     LOG.info("Lost tracker '" + trackerName + "'");
+    
+    // remove the tracker from the local structures
+    synchronized (trackerToJobsToCleanup) {
+      trackerToJobsToCleanup.remove(trackerName);
+    }
+    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 



Mime
View raw message