From hadoop-commits-return-248-apmail-lucene-hadoop-commits-archive=lucene.apache.org@lucene.apache.org Tue May 02 22:05:33 2006 Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 36275 invoked from network); 2 May 2006 22:05:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 2 May 2006 22:05:33 -0000 Received: (qmail 72296 invoked by uid 500); 2 May 2006 22:05:32 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 72276 invoked by uid 500); 2 May 2006 22:05:31 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 72267 invoked by uid 99); 2 May 2006 22:05:31 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2006 15:05:31 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 02 May 2006 15:05:30 -0700 Received: (qmail 36161 invoked by uid 65534); 2 May 2006 22:05:10 -0000 Message-ID: <20060502220510.36160.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r399065 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java Date: Tue, 02 May 2006 22:05:09 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Tue May 2 15:05:08 2006 New Revision: 399065 URL: http://svn.apache.org/viewcvs?rev=399065&view=rev Log: HADOOP-185. Fix so that, if a task tracker times out making the RPC asking for a new task to run, the job tracker does not think that it is actually running the task returned (but never received). Contributed by Owen. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399065&r1=399064&r2=399065&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue May 2 15:05:08 2006 @@ -151,6 +151,10 @@ files containing random data. The second sorts the output of the first. (omalley via cutting) +40. HADOOP-185. Fix so that, when a task tracker times out making the + RPC asking for a new task to run, the job tracker does not think + that it is actually running the task returned. (omalley via cutting) + Release 0.1.1 - 2006-04-08 1. Added CHANGES.txt, logging all significant changes to Hadoop. (cutting) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=399065&r1=399064&r2=399065&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue May 2 15:05:08 2006 @@ -76,6 +76,93 @@ return tracker; } + /** + * A thread to timeout tasks that have been assigned to task trackers, + * but that haven't reported back yet. + * Note that I included a stop() method, even though there is no place + * where JobTrackers are cleaned up. + * @author Owen O'Malley + */ + private class ExpireLaunchingTasks implements Runnable { + private volatile boolean shouldRun = true; + /** + * This is a map of the tasks that have been assigned to task trackers, + * but that have not yet been seen in a status report. + * map: task-id (String) -> time-assigned (Long) + */ + private Map launchingTasks = new LinkedHashMap(); + private static final String errorMsg = "Error launching task"; + private static final String errorHost = "n/a"; + + public void run() { + try { + while (shouldRun) { + // Every 3 minutes check for any tasks that are overdue + Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3); + long now = System.currentTimeMillis(); + LOG.fine("Starting launching task sweep"); + synchronized (launchingTasks) { + Iterator itr = launchingTasks.entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry pair = (Map.Entry) itr.next(); + String taskId = (String) pair.getKey(); + long age = now - ((Long) pair.getValue()).longValue(); + LOG.fine(taskId + " is " + age + " ms old."); + if (age > TASKTRACKER_EXPIRY_INTERVAL) { + LOG.info("Launching task " + taskId + " timed out."); + TaskInProgress tip = null; + synchronized (JobTracker.this) { + tip = (TaskInProgress) taskidToTIPMap.get(taskId); + } + if (tip != null) { + synchronized (tip) { + JobInProgress job = tip.getJob(); + // record why the job failed, so that the user can + // see the problem + TaskStatus status = + new TaskStatus(taskId, + tip.isMapTask(), + 0.0f, + TaskStatus.FAILED, + errorMsg, + errorMsg, + errorHost); + tip.updateStatus(status); + job.failedTask(tip, taskId, errorHost); + } + } + itr.remove(); + } else { + // the tasks are sorted by start time, so once we find + // one that we want to keep, we are done for this cycle. + break; + } + } + } + } + } catch (InterruptedException ie) { + // all done + } + } + + public void addNewTask(String taskName) { + synchronized (launchingTasks) { + launchingTasks.put(taskName, + Long.valueOf(System.currentTimeMillis())); + } + } + + public void removeTask(String taskName) { + synchronized (launchingTasks) { + launchingTasks.remove(taskName); + } + } + + public void stop() { + shouldRun = false; + } + } + /////////////////////////////////////////////////////// // Used to expire TaskTrackers that have gone down /////////////////////////////////////////////////////// @@ -277,7 +364,9 @@ ExpireTrackers expireTrackers = new ExpireTrackers(); RetireJobs retireJobs = new RetireJobs(); JobInitThread initJobs = new JobInitThread(); - + ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); + Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks); + /** * It might seem like a bug to maintain a TreeSet of status objects, * which can be updated at any time. But that's not what happens! We @@ -346,12 +435,12 @@ this.port = addr.getPort(); this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf); this.interTrackerServer.start(); - Properties p = System.getProperties(); - for (Iterator it = p.keySet().iterator(); it.hasNext(); ) { - String key = (String) it.next(); - String val = (String) p.getProperty(key); - LOG.info("Property '" + key + "' is " + val); - } + Properties p = System.getProperties(); + for (Iterator it = p.keySet().iterator(); it.hasNext(); ) { + String key = (String) it.next(); + String val = (String) p.getProperty(key); + LOG.info("Property '" + key + "' is " + val); + } this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030); this.infoServer = new JobTrackerInfoServer(this, infoPort); @@ -362,6 +451,7 @@ new Thread(this.expireTrackers).start(); new Thread(this.retireJobs).start(); new Thread(this.initJobs).start(); + expireLaunchingTaskThread.start(); } public static InetSocketAddress getAddress(Configuration conf) { @@ -622,7 +712,8 @@ Task t = job.obtainNewMapTask(taskTracker, tts); if (t != null) { - return t; + expireLaunchingTasks.addNewTask(t.getTaskId()); + return t; } // @@ -656,7 +747,8 @@ Task t = job.obtainNewReduceTask(taskTracker, tts); if (t != null) { - return t; + expireLaunchingTasks.addNewTask(t.getTaskId()); + return t; } // @@ -878,10 +970,12 @@ for (Iterator it = status.taskReports(); it.hasNext(); ) { TaskStatus report = (TaskStatus) it.next(); report.setHostname(status.getHost()); - TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(report.getTaskId()); + String taskId = report.getTaskId(); + TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); if (tip == null) { LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskId()); } else { + expireLaunchingTasks.removeTask(taskId); JobInProgress job = tip.getJob(); job.updateTaskStatus(tip, report);