Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 12199 invoked from network); 12 Feb 2007 23:38:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Feb 2007 23:38:50 -0000 Received: (qmail 17970 invoked by uid 500); 12 Feb 2007 23:38:58 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 17950 invoked by uid 500); 12 Feb 2007 23:38:57 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 17940 invoked by uid 99); 12 Feb 2007 23:38:57 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Feb 2007 15:38:57 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Feb 2007 15:38:49 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 788081A981A; Mon, 12 Feb 2007 15:38:29 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r506745 - in /lucene/hadoop/trunk: CHANGES.txt src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java src/java/org/apache/hadoop/mapred/TaskTracker.java Date: Mon, 12 Feb 2007 23:38:29 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070212233829.788081A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Mon Feb 12 15:38:27 2007 New Revision: 506745 URL: http://svn.apache.org/viewvc?view=rev&rev=506745 Log: HADOOP-491. Change mapred.task.timeout to be per-job. Contributed by Arun. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=506745&r1=506744&r2=506745 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 12 15:38:27 2007 @@ -35,6 +35,10 @@ 10. HADOOP-1007. Make names of metrics used in Hadoop unique. (Nigel Daley via cutting) +11. HADOOP-491. Change mapred.task.timeout to be per-job, and make a + value of zero mean no timeout. Also change contrib/streaming to + disable task timeouts. (Arun C Murthy via cutting) + Release 0.11.1 - 2007-02-09 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=506745&r1=506744&r2=506745 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Feb 12 15:38:27 2007 @@ -514,6 +514,9 @@ // general MapRed job properties jobConf_ = new JobConf(config_); + + // All streaming jobs have, by default, no time-out for tasks + jobConf_.setLong("mapred.task.timeout", 0); setUserJobConfProps(true); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=506745&r1=506744&r2=506745 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Feb 12 15:38:27 2007 @@ -53,7 +53,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, Runnable { static final long WAIT_FOR_DONE = 3 * 1000; - private long taskTimeout; private int httpPort; static enum State {NORMAL, STALE, INTERRUPTED} @@ -435,7 +434,6 @@ maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2); this.fConf = conf; this.jobTrackAddr = JobTracker.getAddress(conf); - this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); int httpPort = conf.getInt("tasktracker.http.port", 50060); @@ -643,20 +641,29 @@ */ private synchronized void markUnresponsiveTasks() throws IOException { long now = System.currentTimeMillis(); - for (TaskInProgress tip: runningTasks.values()) { - long timeSinceLastReport = now - tip.getLastProgressReport(); - if ((tip.getRunState() == TaskStatus.State.RUNNING) && - (timeSinceLastReport > this.taskTimeout) && - !tip.wasKilled) { - String msg = "Task failed to report status for " + - (timeSinceLastReport / 1000) + - " seconds. Killing."; - LOG.info(tip.getTask().getTaskId() + ": " + msg); - ReflectionUtils.logThreadInfo(LOG, "lost task", 30); - tip.reportDiagnosticInfo(msg); - purgeTask(tip); - } + for (TaskInProgress tip: runningTasks.values()) { + if (tip.getRunState() == TaskStatus.State.RUNNING) { + // Check the per-job timeout interval for tasks; + // an interval of '0' implies it is never timed-out + long jobTaskTimeout = tip.getTaskTimeout(); + if (jobTaskTimeout == 0) { + continue; + } + + // Check if the task has not reported progress for a + // time-period greater than the configured time-out + long timeSinceLastReport = now - tip.getLastProgressReport(); + if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) { + String msg = "Task failed to report status for " + + (timeSinceLastReport / 1000) + + " seconds. Killing."; + LOG.info(tip.getTask().getTaskId() + ": " + msg); + ReflectionUtils.logThreadInfo(LOG, "lost task", 30); + tip.reportDiagnosticInfo(msg); + purgeTask(tip); + } } + } } /** @@ -902,6 +909,7 @@ private boolean alwaysKeepTaskFiles; private TaskStatus taskStatus ; private boolean keepJobFiles; + private long taskTimeout; /** */ @@ -920,6 +928,7 @@ getName(), task.isMapTask()? TaskStatus.Phase.MAP: TaskStatus.Phase.SHUFFLE); keepJobFiles = false; + taskTimeout = (10 * 60 * 1000); } private void localizeTask(Task task) throws IOException{ @@ -964,6 +973,12 @@ public void setJobConf(JobConf lconf){ this.localJobConf = lconf; keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); + taskTimeout = localJobConf.getLong("mapred.task.timeout", + 10 * 60 * 1000); + } + + public JobConf getJobConf() { + return localJobConf; } /** @@ -1024,6 +1039,15 @@ return runstate; } + /** + * The task's configured timeout. + * + * @return the task's configured timeout. + */ + public long getTaskTimeout() { + return taskTimeout; + } + /** * The task has reported some diagnostic info about its status */