hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r746234 - in /hadoop/core/branches/branch-0.20: ./ CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobStatus.java src/mapred/org/apache/hadoop/mapred/JobTracker.java
Date Fri, 20 Feb 2009 13:41:02 GMT
Author: ddas
Date: Fri Feb 20 13:41:01 2009
New Revision: 746234

URL: http://svn.apache.org/viewvc?rev=746234&view=rev
Log:
Merge -r 746232:746233 from trunk onto branch-0.20. Fixes HADOOP-5247.

Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 20 13:41:01 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=746234&r1=746233&r2=746234&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Feb 20 13:41:01 2009
@@ -625,6 +625,11 @@
     (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235).
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when
+    a job finishes. This fixes a bunch of problems to do with NPE when a completed
+    job is not in memory and a tasktracker comes to the jobtracker with a status
+    report of a task belonging to that job. (Amar Kamat via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 20 13:41:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=746234&r1=746233&r2=746234&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Feb 20 13:41:01 2009
@@ -2609,9 +2609,6 @@
   }
 
   boolean isComplete() {
-    int runState = this.status.getRunState();
-    return runState == JobStatus.SUCCEEDED 
-           || runState == JobStatus.FAILED 
-           || runState == JobStatus.KILLED;
+    return status.isJobComplete();
   }
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=746234&r1=746233&r2=746234&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobStatus.java Fri
Feb 20 13:41:01 2009
@@ -270,6 +270,14 @@
      priority = jp;
    }
   
+   /**
+    * Returns true if the status is for a completed job.
+    */
+   public synchronized boolean isJobComplete() {
+     return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED 
+             || runState == JobStatus.KILLED);
+   }
+
   ///////////////////////////////////////
   // Writable
   ///////////////////////////////////////

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=746234&r1=746233&r2=746234&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri
Feb 20 13:41:01 2009
@@ -1219,6 +1219,10 @@
   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
     new TreeMap<String, ArrayList<JobInProgress>>();
     
+  // (trackerID --> list of jobs to cleanup)
+  Map<String, Set<JobID>> trackerToJobsToCleanup = 
+    new HashMap<String, Set<JobID>>();
+  
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -1835,6 +1839,9 @@
 
     long now = System.currentTimeMillis();
     
+    // mark the job for cleanup at all the trackers
+    addJobForCleanup(id);
+
     // add the blacklisted trackers to potentially faulty list
     if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
       if (job.getNoOfBlackListedTrackers() > 0) {
@@ -2320,6 +2327,12 @@
       actions.addAll(killTasksList);
     }
      
+    // Check for jobs to be killed/cleanedup
+    List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
+    if (killJobsList != null) {
+      actions.addAll(killJobsList);
+    }
+
     // Check for tasks whose outputs can be saved
     List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
     if (commitTasksList != null) {
@@ -2496,27 +2509,58 @@
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
     if (taskIds != null) {
       List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
-      Set<JobID> killJobIds = new TreeSet<JobID>(); 
       for (TaskAttemptID killTaskId : taskIds) {
         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
+        if (tip == null) {
+          continue;
+        }
         if (tip.shouldClose(killTaskId)) {
           // 
           // This is how the JobTracker ends a task at the TaskTracker.
           // It may be successfully completed, or may be killed in
           // mid-execution.
           //
-          if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING ||
-              tip.getJob().getStatus().getRunState() == JobStatus.PREP) {
+          if (!tip.getJob().isComplete()) {
             killList.add(new KillTaskAction(killTaskId));
             LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
-          } else {
-            JobID killJobId = tip.getJob().getStatus().getJobID(); 
-            killJobIds.add(killJobId);
           }
         }
       }
             
-      for (JobID killJobId : killJobIds) {
+      return killList;
+    }
+    return null;
+  }
+
+  /**
+   * Add a job to cleanup for the tracker.
+   */
+  private void addJobForCleanup(JobID id) {
+    for (String taskTracker : taskTrackers.keySet()) {
+      LOG.debug("Marking job " + id + " for cleanup by tracker " + taskTracker);
+      synchronized (trackerToJobsToCleanup) {
+        Set<JobID> jobsToKill = trackerToJobsToCleanup.get(taskTracker);
+        if (jobsToKill == null) {
+          jobsToKill = new HashSet<JobID>();
+          trackerToJobsToCleanup.put(taskTracker, jobsToKill);
+        }
+        jobsToKill.add(id);
+      }
+    }
+  }
+  
+  /**
+   * A tracker wants to know if any job needs cleanup because the job completed.
+   */
+  private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) {
+    Set<JobID> jobs = null;
+    synchronized (trackerToJobsToCleanup) {
+      jobs = trackerToJobsToCleanup.remove(taskTracker);
+    }
+    if (jobs != null) {
+      // prepare the actions list
+      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
+      for (JobID killJobId : jobs) {
         killList.add(new KillJobAction(killJobId));
         LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
       }
@@ -2538,6 +2582,9 @@
         if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
           TaskAttemptID taskId = taskStatus.getTaskID();
           TaskInProgress tip = taskidToTIPMap.get(taskId);
+          if (tip == null) {
+            continue;
+          }
           if (tip.shouldCommit(taskId)) {
             saveList.add(new CommitTaskAction(taskId));
             LOG.debug(tts.getTrackerName() + 
@@ -3091,16 +3138,23 @@
     for (TaskStatus report : status.getTaskReports()) {
       report.setTaskTracker(trackerName);
       TaskAttemptID taskId = report.getTaskID();
+      
+      // expire it
+      expireLaunchingTasks.removeTask(taskId);
+      
+      JobInProgress job = getJob(taskId.getJobID());
+      if (job == null) {
+        continue;
+      }
+      
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
       if (tip != null || hasRestarted()) {
-        JobInProgress job = getJob(taskId.getJobID());
         if (tip == null) {
           tip = job.getTaskInProgress(taskId.getTaskID());
           job.addRunningTaskToTIP(tip, taskId, status, false);
         }
-        expireLaunchingTasks.removeTask(taskId);
         
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
@@ -3148,6 +3202,12 @@
    */
   void lostTaskTracker(String trackerName) {
     LOG.info("Lost tracker '" + trackerName + "'");
+    
+    // remove the tracker from the local structures
+    synchronized (trackerToJobsToCleanup) {
+      trackerToJobsToCleanup.remove(trackerName);
+    }
+    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 



Mime
View raw message