hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r399833 - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/
Date Thu, 04 May 2006 19:27:40 GMT
Author: cutting
Date: Thu May  4 12:27:36 2006
New Revision: 399833

URL: http://svn.apache.org/viewcvs?rev=399833&view=rev
Log:
HADOOP-182.  Fix problems related to lost task trackers.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu May  4 12:27:36 2006
@@ -184,6 +184,9 @@
     every block in every file in the filesystem.  (Konstantin Shvachko
     via cutting)
 
+48. HADOOP-182.  Fix so that lost task trackers to not change the
+    status of reduce tasks or completed jobs.  Also fixes the progress
+    meter so that failed tasks are subtracted. (omalley via cutting)
 
 Release 0.1.1 - 2006-04-08
 

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Thu May
 4 12:27:36 2006
@@ -16,12 +16,6 @@
 
 package org.apache.hadoop.examples;
 import org.apache.hadoop.util.ProgramDriver;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 
 public class ExampleDriver {
   

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Thu May  4
12:27:36 2006
@@ -19,7 +19,6 @@
 import java.io.*;
 import java.util.*;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.UTF8;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSck.java Thu May  4 12:27:36 2006
@@ -333,9 +333,9 @@
       e.printStackTrace();
       success = false;
     } finally {
-      try {in.close(); } catch (Exception e1) {};
-      try {out.close(); } catch (Exception e1) {};
-      try {s.close(); } catch (Exception e1) {};
+      try {in.close(); } catch (Exception e1) {}
+      try {out.close(); } catch (Exception e1) {}
+      try {s.close(); } catch (Exception e1) {}
     }
     if (!success)
       throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Thu May
 4 12:27:36 2006
@@ -18,7 +18,6 @@
 
 import java.io.*;
 import java.net.*;
-import java.util.*;
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu May  4 12:27:36
2006
@@ -95,7 +95,7 @@
      * Construct the splits, etc.  This is invoked from an async
      * thread so that split-computation doesn't block anyone.
      */
-    public void initTasks() throws IOException {
+    public synchronized void initTasks() throws IOException {
         if (tasksInited) {
             return;
         }
@@ -243,9 +243,12 @@
     ////////////////////////////////////////////////////
     // Status update methods
     ////////////////////////////////////////////////////
-    public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
+    public synchronized void updateTaskStatus(TaskInProgress tip, 
+                                              TaskStatus status) {
         double oldProgress = tip.getProgress();   // save old progress
         tip.updateStatus(status);                 // update tip
+        LOG.fine("Taking progress for " + tip.getTIPId() + " from " + 
+                 oldProgress + " to " + tip.getProgress());
 
         //
         // Update JobInProgress status
@@ -416,7 +419,10 @@
     /**
      * A taskid assigned to this JobInProgress has reported in successfully.
      */
-    public synchronized void completedTask(TaskInProgress tip, String taskid) {
+    public synchronized void completedTask(TaskInProgress tip, 
+                                           TaskStatus status) {
+        String taskid = status.getTaskId();
+        updateTaskStatus(tip, status);
         LOG.info("Taskid '" + taskid + "' has finished successfully.");
         tip.completed(taskid);
 
@@ -443,7 +449,8 @@
         // If all tasks are complete, then the job is done!
         //
         if (status.getRunState() == JobStatus.RUNNING && allDone) {
-            this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);
+            this.status = new JobStatus(this.status.getJobId(), 1.0f, 1.0f, 
+                                        JobStatus.SUCCEEDED);
             this.finishTime = System.currentTimeMillis();
             garbageCollect();
         }
@@ -483,8 +490,10 @@
      * we need to schedule reexecution so that downstream reduce tasks can 
      * obtain the map task's output.
      */
-    public void failedTask(TaskInProgress tip, String taskid, String trackerName) {
+    public synchronized void failedTask(TaskInProgress tip, String taskid, 
+                                        TaskStatus status, String trackerName) {
         tip.failedSubTask(taskid, trackerName);
+        updateTaskStatus(tip, status);
         
         // After this, try to assign tasks with the one after this, so that
         // the failed task goes to the end of the list.
@@ -501,8 +510,31 @@
             LOG.info("Aborting job " + profile.getJobId());
             kill();
         }
-    }
 
+        jobtracker.removeTaskEntry(taskid);
+ }
+
+    /**
+     * Fail a task with a given reason, but without a status object.
+     * @author Owen O'Malley
+     * @param tip The task's tip
+     * @param taskid The task id
+     * @param reason The reason that the task failed
+     * @param trackerName The task tracker the task failed on
+     */
+    public void failedTask(TaskInProgress tip, String taskid, 
+                           String reason, String hostname, String trackerName) {
+       TaskStatus status = new TaskStatus(taskid,
+                                          tip.isMapTask(),
+                                          0.0f,
+                                          TaskStatus.FAILED,
+                                          reason,
+                                          reason,
+                                          hostname);
+       failedTask(tip, taskid, status, trackerName);
+    }
+       
+                           
     /**
      * The job is dead.  We're now GC'ing it, getting rid of the job
      * from all tables.  Be sure to remove all of this job's tasks

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu May  4 12:27:36
2006
@@ -95,8 +95,6 @@
        * map: task-id (String) -> time-assigned (Long)
        */
       private Map launchingTasks = new LinkedHashMap();
-      private static final String errorMsg = "Error launching task";
-      private static final String errorHost = "n/a";
       
       public void run() {
         try {
@@ -119,21 +117,9 @@
                     tip = (TaskInProgress) taskidToTIPMap.get(taskId);
                   }
                   if (tip != null) {
-                    synchronized (tip) {
-                      JobInProgress job = tip.getJob();
-                      // record why the job failed, so that the user can
-                      // see the problem
-                      TaskStatus status = 
-                        new TaskStatus(taskId,
-                                       tip.isMapTask(),
-                                       0.0f,
-                                       TaskStatus.FAILED,
-                                       errorMsg,
-                                       errorMsg,
-                                       errorHost);
-                      tip.updateStatus(status);
-                      job.failedTask(tip, taskId, errorHost);
-                    }
+                     JobInProgress job = tip.getJob();
+                     job.failedTask(tip, taskId, "Error launching task", 
+                                    "n/a", "n/a");
                   }
                   itr.remove();
                 } else {
@@ -214,7 +200,8 @@
                                 if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)
{
                                     // Remove completely
                                     updateTaskTrackerStatus(trackerName, null);
-                                    lostTaskTracker(leastRecent.getTrackerName());
+                                    lostTaskTracker(leastRecent.getTrackerName(),
+                                                    leastRecent.getHost());
                                 } else {
                                     // Update time by inserting latest profile
                                     trackerExpiryQueue.add(newProfile);
@@ -582,14 +569,6 @@
     ////////////////////////////////////////////////////
     // InterTrackerProtocol
     ////////////////////////////////////////////////////
-    public void initialize(String taskTrackerName) {
-      synchronized (taskTrackers) {
-        boolean seenBefore = updateTaskTrackerStatus(taskTrackerName, null);
-        if (seenBefore) {
-          lostTaskTracker(taskTrackerName);
-        }
-      }
-    }
 
     /**
      * Update the last recorded status for the given task tracker.
@@ -632,7 +611,7 @@
                 if (initialContact) {
                     // If it's first contact, then clear out any state hanging around
                     if (seenBefore) {
-                        lostTaskTracker(trackerName);
+                        lostTaskTracker(trackerName, trackerStatus.getHost());
                     }
                 } else {
                     // If not first contact, there should be some record of the tracker
@@ -981,13 +960,13 @@
             } else {
                 expireLaunchingTasks.removeTask(taskId);
                 JobInProgress job = tip.getJob();
-                job.updateTaskStatus(tip, report);
 
                 if (report.getRunState() == TaskStatus.SUCCEEDED) {
-                    job.completedTask(tip, report.getTaskId());
+                    job.completedTask(tip, report);
                 } else if (report.getRunState() == TaskStatus.FAILED) {
                     // Tell the job to fail the relevant task
-                    job.failedTask(tip, report.getTaskId(), status.getTrackerName());
+                    job.failedTask(tip, report.getTaskId(), report, 
+                                   status.getTrackerName());
                 }
             }
         }
@@ -998,7 +977,7 @@
      * already been updated.  Just process the contained tasks and any
      * jobs that might be affected.
      */
-    void lostTaskTracker(String trackerName) {
+    void lostTaskTracker(String trackerName, String hostname) {
         LOG.info("Lost tracker '" + trackerName + "'");
         TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName);
         trackerToTaskMap.remove(trackerName);
@@ -1008,9 +987,16 @@
                 String taskId = (String) it.next();
                 TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
 
-                // Tell the job to fail the relevant task
-                JobInProgress job = tip.getJob();
-                job.failedTask(tip, taskId, trackerName);
+                // Completed reduce tasks never need to be failed, because 
+                // their outputs go to dfs
+                if (tip.isMapTask() || !tip.isComplete()) {
+                  JobInProgress job = tip.getJob();
+                  // if the job is done, we don't want to change anything
+                  if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+                    job.failedTask(tip, taskId, "Lost task tracker", 
+                                   hostname, trackerName);
+                  }
+                }
             }
         }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu May  4 12:27:36
2006
@@ -15,11 +15,8 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.LogFormatter;
 
-import java.io.*;
 import java.text.NumberFormat;
 import java.util.*;
 import java.util.logging.*;
@@ -304,11 +301,6 @@
             kill();
         }
         machinesWhereFailed.add(trackerName);
-
-        // Ask JobTracker to forget about this task
-        jobtracker.removeTaskEntry(taskid);
-
-        recomputeProgress();
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=399833&r1=399832&r2=399833&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu May  4 12:27:36
2006
@@ -575,6 +575,7 @@
                     failures += 1;
                   }
                   runstate = TaskStatus.FAILED;
+                  progress = 0.0f;
               }
               
               needCleanup = runstate == TaskStatus.FAILED;
@@ -627,6 +628,7 @@
             if (runstate == TaskStatus.SUCCEEDED) {
               LOG.info("Reporting output lost:"+task.getTaskId());
               runstate = TaskStatus.FAILED;       // change status to failure
+              progress = 0.0f;
               runningTasks.put(task.getTaskId(), this);
               mapTotal++;
             } else {



Mime
View raw message