Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 40345 invoked from network); 18 Apr 2006 22:08:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 18 Apr 2006 22:08:20 -0000 Received: (qmail 71778 invoked by uid 500); 18 Apr 2006 22:08:20 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 71730 invoked by uid 500); 18 Apr 2006 22:08:20 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 71721 invoked by uid 99); 18 Apr 2006 22:08:20 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2006 15:08:20 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 18 Apr 2006 15:08:19 -0700 Received: (qmail 40166 invoked by uid 65534); 18 Apr 2006 22:07:59 -0000 Message-ID: <20060418220759.40163.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r395069 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java Date: Tue, 18 Apr 2006 22:07:58 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Tue Apr 18 15:07:57 2006 New Revision: 395069 URL: http://svn.apache.org/viewcvs?rev=395069&view=rev Log: Fix for HADOOP-142. Avoid re-running a task on a node where it has previously failed. Contributed by Owen. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=395069&r1=395068&r2=395069&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Apr 18 15:07:57 2006 @@ -54,9 +54,12 @@ 15. Fix HADOOP-115. Correct an error message. (Stack via cutting) -16. "Fix HADOOP-133. Retry pings from child to parent, in case of +16. Fix HADOOP-133. Retry pings from child to parent, in case of (local) communcation problems. Also log exit status, so that one can distinguish patricide from other deaths. (omalley via cutting) + +17. Fix HADOOP-142. Avoid re-running a task on a host where it has + previously failed. (omalley via cutting) Release 0.1.1 - 2006-04-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=395069&r1=395068&r2=395069&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Apr 18 15:07:57 2006 @@ -50,6 +50,8 @@ long finishTime; private JobConf conf; + private int firstMapToTry = 0; + private int firstReduceToTry = 0; boolean tasksInited = false; private LocalFileSystem localFs; @@ -139,7 +141,8 @@ // create a map task for each split this.maps = new TaskInProgress[numMapTasks]; for (int i = 0; i < numMapTasks; i++) { - maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, this); + maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, + this, i); } // @@ -278,6 +281,7 @@ int cacheTarget = -1; int stdTarget = -1; int specTarget = -1; + int failedTarget = -1; // // We end up creating two tasks for the same bucket, because @@ -296,10 +300,17 @@ // doesn't have to be transmitted from another node. // for (int i = 0; i < maps.length; i++) { - if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) { + int realIdx = (i + firstMapToTry) % maps.length; + if (maps[realIdx].hasTaskWithCacheHit(taskTracker, tts)) { if (cacheTarget < 0) { - cacheTarget = i; + if (maps[realIdx].hasFailedOnMachine(taskTracker)) { + if (failedTarget < 0) { + failedTarget = realIdx; + } + } else { + cacheTarget = realIdx; break; + } } } } @@ -310,10 +321,17 @@ // if (cacheTarget < 0) { for (int i = 0; i < maps.length; i++) { - if (maps[i].hasTask()) { + int realIdx = (i + firstMapToTry) % maps.length; + if (maps[realIdx].hasTask()) { if (stdTarget < 0) { - stdTarget = i; + if (maps[realIdx].hasFailedOnMachine(taskTracker)) { + if (failedTarget < 0) { + failedTarget = realIdx; + } + } else { + stdTarget = realIdx; break; + } } } } @@ -325,11 +343,12 @@ // if (cacheTarget < 0 && stdTarget < 0) { for (int i = 0; i < maps.length; i++) { - if (maps[i].hasSpeculativeTask(avgProgress)) { - if (specTarget < 0) { - specTarget = i; + int realIdx = (i + firstMapToTry) % maps.length; + if (maps[realIdx].hasSpeculativeTask(avgProgress)) { + if (!maps[realIdx].hasFailedOnMachine(taskTracker)) { + specTarget = realIdx; break; - } + } } } } @@ -343,6 +362,8 @@ t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress); } else if (specTarget >= 0) { t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress); + } else if (failedTarget >= 0) { + t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress); } return t; } @@ -361,16 +382,23 @@ Task t = null; int stdTarget = -1; int specTarget = -1; + int failedTarget = -1; double avgProgress = status.reduceProgress() / reduces.length; for (int i = 0; i < reduces.length; i++) { - if (reduces[i].hasTask()) { - if (stdTarget < 0) { - stdTarget = i; + int realIdx = (i + firstReduceToTry) % reduces.length; + if (reduces[realIdx].hasTask()) { + if (reduces[realIdx].hasFailedOnMachine(taskTracker)) { + if (failedTarget < 0) { + failedTarget = realIdx; + } + } else if (stdTarget < 0) { + stdTarget = realIdx; } - } else if (reduces[i].hasSpeculativeTask(avgProgress)) { - if (specTarget < 0) { - specTarget = i; + } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) { + if (specTarget < 0 && + !reduces[realIdx].hasFailedOnMachine(taskTracker)) { + specTarget = realIdx; } } } @@ -379,6 +407,9 @@ t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress); } else if (specTarget >= 0) { t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress); + } else if (failedTarget >= 0) { + t = reduces[failedTarget].getTaskToRun(taskTracker, tts, + avgProgress); } return t; } @@ -455,6 +486,14 @@ */ public void failedTask(TaskInProgress tip, String taskid, String trackerName) { tip.failedSubTask(taskid, trackerName); + + // After this, try to assign tasks with the one after this, so that + // the failed task goes to the end of the list. + if (tip.isMapTask()) { + firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length; + } else { + firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length; + } // // Check if we need to kill the job because of too many failures Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=395069&r1=395068&r2=395069&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Apr 18 15:07:57 2006 @@ -77,19 +77,24 @@ /** * Constructor for MapTask */ - public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job) { + public TaskInProgress(String jobFile, FileSplit split, + JobTracker jobtracker, JobConf conf, + JobInProgress job, int partition) { this.jobFile = jobFile; this.split = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; + this.partition = partition; init(); } /** * Constructor for ReduceTask */ - public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) { + public TaskInProgress(String jobFile, TaskInProgress predecessors[], + int partition, JobTracker jobtracker, JobConf conf, + JobInProgress job) { this.jobFile = jobFile; this.predecessors = predecessors; this.partition = partition; @@ -455,5 +460,22 @@ jobtracker.createTaskEntry(taskid, taskTracker, this); } return t; + } + + /** + * Has this task already failed on this machine? + * @param tracker The task tracker name + * @return Has it failed? + */ + public boolean hasFailedOnMachine(String tracker) { + return machinesWhereFailed.contains(tracker); + } + + /** + * Get the id of this map or reduce task. + * @return The index of this tip in the maps/reduces lists. + */ + public int getIdWithinJob() { + return partition; } }