hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r395069 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java
Date Tue, 18 Apr 2006 22:07:58 GMT
Author: cutting
Date: Tue Apr 18 15:07:57 2006
New Revision: 395069

URL: http://svn.apache.org/viewcvs?rev=395069&view=rev
Log:
Fix for HADOOP-142.  Avoid re-running a task on a node where it has previously failed.  Contributed
by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=395069&r1=395068&r2=395069&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Apr 18 15:07:57 2006
@@ -54,9 +54,12 @@
 
 15. Fix HADOOP-115.  Correct an error message.  (Stack via cutting)
 
-16. "Fix HADOOP-133.  Retry pings from child to parent, in case of
+16. Fix HADOOP-133.  Retry pings from child to parent, in case of
     (local) communcation problems.  Also log exit status, so that one
     can distinguish patricide from other deaths.  (omalley via cutting)
+
+17. Fix HADOOP-142.  Avoid re-running a task on a host where it has
+    previously failed.  (omalley via cutting)
 
 
 Release 0.1.1 - 2006-04-08

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=395069&r1=395068&r2=395069&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Apr 18 15:07:57
2006
@@ -50,6 +50,8 @@
     long finishTime;
 
     private JobConf conf;
+    private int firstMapToTry = 0;
+    private int firstReduceToTry = 0;
     boolean tasksInited = false;
 
     private LocalFileSystem localFs;
@@ -139,7 +141,8 @@
         // create a map task for each split
         this.maps = new TaskInProgress[numMapTasks];
         for (int i = 0; i < numMapTasks; i++) {
-            maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, this);
+            maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, 
+                                         this, i);
         }
 
         //
@@ -278,6 +281,7 @@
         int cacheTarget = -1;
         int stdTarget = -1;
         int specTarget = -1;
+        int failedTarget = -1;
 
         //
         // We end up creating two tasks for the same bucket, because
@@ -296,10 +300,17 @@
         // doesn't have to be transmitted from another node.
         //
         for (int i = 0; i < maps.length; i++) {
-            if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) {
+            int realIdx = (i + firstMapToTry) % maps.length; 
+            if (maps[realIdx].hasTaskWithCacheHit(taskTracker, tts)) {
                 if (cacheTarget < 0) {
-                    cacheTarget = i;
+                  if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
+                    if (failedTarget < 0) {
+                      failedTarget = realIdx;
+                    }
+                  } else {
+                    cacheTarget = realIdx;
                     break;
+                  }
                 }
             }
         }
@@ -310,10 +321,17 @@
         //
         if (cacheTarget < 0) {
             for (int i = 0; i < maps.length; i++) {
-                if (maps[i].hasTask()) {
+                int realIdx = (i + firstMapToTry) % maps.length; 
+                if (maps[realIdx].hasTask()) {
                     if (stdTarget < 0) {
-                        stdTarget = i;
+                      if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
+                        if (failedTarget < 0) {
+                          failedTarget = realIdx;
+                        }
+                      } else {
+                        stdTarget = realIdx;
                         break;
+                      }
                     }
                 }
             }
@@ -325,11 +343,12 @@
         //
         if (cacheTarget < 0 && stdTarget < 0) {
             for (int i = 0; i < maps.length; i++) {        
-                if (maps[i].hasSpeculativeTask(avgProgress)) {
-                    if (specTarget < 0) {
-                        specTarget = i;
+                int realIdx = (i + firstMapToTry) % maps.length; 
+                if (maps[realIdx].hasSpeculativeTask(avgProgress)) {
+                      if (!maps[realIdx].hasFailedOnMachine(taskTracker)) {
+                        specTarget = realIdx;
                         break;
-                    }
+                      }
                 }
             }
         }
@@ -343,6 +362,8 @@
             t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
         } else if (specTarget >= 0) {
             t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
+        } else if (failedTarget >= 0) {
+            t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress);
         }
         return t;
     }
@@ -361,16 +382,23 @@
         Task t = null;
         int stdTarget = -1;
         int specTarget = -1;
+        int failedTarget = -1;
         double avgProgress = status.reduceProgress() / reduces.length;
 
         for (int i = 0; i < reduces.length; i++) {
-            if (reduces[i].hasTask()) {
-                if (stdTarget < 0) {
-                    stdTarget = i;
+            int realIdx = (i + firstReduceToTry) % reduces.length;
+            if (reduces[realIdx].hasTask()) {
+                if (reduces[realIdx].hasFailedOnMachine(taskTracker)) {
+                  if (failedTarget < 0) {
+                    failedTarget = realIdx;
+                  }
+                } else if (stdTarget < 0) {
+                    stdTarget = realIdx;
                 }
-            } else if (reduces[i].hasSpeculativeTask(avgProgress)) {
-                if (specTarget < 0) {
-                    specTarget = i;
+            } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) {
+                if (specTarget < 0 &&
+                    !reduces[realIdx].hasFailedOnMachine(taskTracker)) {
+                    specTarget = realIdx;
                 }
             }
         }
@@ -379,6 +407,9 @@
             t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
         } else if (specTarget >= 0) {
             t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
+        } else if (failedTarget >= 0) {
+            t = reduces[failedTarget].getTaskToRun(taskTracker, tts, 
+                                                   avgProgress);
         }
         return t;
     }
@@ -455,6 +486,14 @@
      */
     public void failedTask(TaskInProgress tip, String taskid, String trackerName) {
         tip.failedSubTask(taskid, trackerName);
+        
+        // After this, try to assign tasks with the one after this, so that
+        // the failed task goes to the end of the list.
+        if (tip.isMapTask()) {
+          firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
+        } else {
+          firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
+        }
             
         //
         // Check if we need to kill the job because of too many failures

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=395069&r1=395068&r2=395069&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Apr 18 15:07:57
2006
@@ -77,19 +77,24 @@
     /**
      * Constructor for MapTask
      */
-    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf
conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, FileSplit split, 
+                          JobTracker jobtracker, JobConf conf, 
+                          JobInProgress job, int partition) {
         this.jobFile = jobFile;
         this.split = split;
         this.jobtracker = jobtracker;
         this.job = job;
         this.conf = conf;
+        this.partition = partition;
         init();
     }
         
     /**
      * Constructor for ReduceTask
      */
-    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker
jobtracker, JobConf conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, TaskInProgress predecessors[], 
+                          int partition, JobTracker jobtracker, JobConf conf,
+                          JobInProgress job) {
         this.jobFile = jobFile;
         this.predecessors = predecessors;
         this.partition = partition;
@@ -455,5 +460,22 @@
             jobtracker.createTaskEntry(taskid, taskTracker, this);
         }
         return t;
+    }
+    
+    /**
+     * Has this task already failed on this machine?
+     * @param tracker The task tracker name
+     * @return Has it failed?
+     */
+    public boolean hasFailedOnMachine(String tracker) {
+      return machinesWhereFailed.contains(tracker);
+    }
+    
+    /**
+     * Get the id of this map or reduce task.
+     * @return The index of this tip in the maps/reduces lists.
+     */
+    public int getIdWithinJob() {
+      return partition;
     }
 }



Mime
View raw message