hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r506745 - in /lucene/hadoop/trunk: CHANGES.txt src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Mon, 12 Feb 2007 23:38:29 GMT
Author: cutting
Date: Mon Feb 12 15:38:27 2007
New Revision: 506745

URL: http://svn.apache.org/viewvc?view=rev&rev=506745
Log:
HADOOP-491.  Change mapred.task.timeout to be per-job.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=506745&r1=506744&r2=506745
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 12 15:38:27 2007
@@ -35,6 +35,10 @@
 10. HADOOP-1007. Make names of metrics used in Hadoop unique.
     (Nigel Daley via cutting)
 
+11. HADOOP-491.  Change mapred.task.timeout to be per-job, and make a
+    value of zero mean no timeout.  Also change contrib/streaming to
+    disable task timeouts.  (Arun C Murthy via cutting)
+
 
 Release 0.11.1 - 2007-02-09
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=506745&r1=506744&r2=506745
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Mon Feb 12 15:38:27 2007
@@ -514,6 +514,9 @@
 
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
+    
+    // All streaming jobs have, by default, no time-out for tasks
+    jobConf_.setLong("mapred.task.timeout", 0);
 
     setUserJobConfProps(true);
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=506745&r1=506744&r2=506745
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Feb 12 15:38:27
2007
@@ -53,7 +53,6 @@
 public class TaskTracker 
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
     static final long WAIT_FOR_DONE = 3 * 1000;
-    private long taskTimeout; 
     private int httpPort;
 
     static enum State {NORMAL, STALE, INTERRUPTED}
@@ -435,7 +434,6 @@
       maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
       this.fConf = conf;
       this.jobTrackAddr = JobTracker.getAddress(conf);
-      this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
       this.mapOutputFile = new MapOutputFile();
       this.mapOutputFile.setConf(conf);
       int httpPort = conf.getInt("tasktracker.http.port", 50060);
@@ -643,20 +641,29 @@
      */
     private synchronized void markUnresponsiveTasks() throws IOException {
       long now = System.currentTimeMillis();
-        for (TaskInProgress tip: runningTasks.values()) {
-            long timeSinceLastReport = now - tip.getLastProgressReport();
-            if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
-                (timeSinceLastReport > this.taskTimeout) &&
-                !tip.wasKilled) {
-                String msg = "Task failed to report status for " +
-                             (timeSinceLastReport / 1000) + 
-                             " seconds. Killing.";
-                LOG.info(tip.getTask().getTaskId() + ": " + msg);
-                ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
-                tip.reportDiagnosticInfo(msg);
-                purgeTask(tip);
-            }
+      for (TaskInProgress tip: runningTasks.values()) {
+        if (tip.getRunState() == TaskStatus.State.RUNNING) {
+          // Check the per-job timeout interval for tasks;
+          // an interval of '0' implies it is never timed-out
+          long jobTaskTimeout = tip.getTaskTimeout();
+          if (jobTaskTimeout == 0) {
+            continue;
+          }
+          
+          // Check if the task has not reported progress for a 
+          // time-period greater than the configured time-out
+          long timeSinceLastReport = now - tip.getLastProgressReport();
+          if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
+            String msg = "Task failed to report status for " +
+            (timeSinceLastReport / 1000) + 
+            " seconds. Killing.";
+            LOG.info(tip.getTask().getTaskId() + ": " + msg);
+            ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
+            tip.reportDiagnosticInfo(msg);
+            purgeTask(tip);
+          }
         }
+      }
     }
 
     /**
@@ -902,6 +909,7 @@
         private boolean alwaysKeepTaskFiles;
         private TaskStatus taskStatus ; 
         private boolean keepJobFiles;
+        private long taskTimeout;
         
         /**
          */
@@ -920,6 +928,7 @@
                  getName(), task.isMapTask()? TaskStatus.Phase.MAP:
                    TaskStatus.Phase.SHUFFLE); 
             keepJobFiles = false;
+            taskTimeout = (10 * 60 * 1000);
         }
         
         private void localizeTask(Task task) throws IOException{
@@ -964,6 +973,12 @@
         public void setJobConf(JobConf lconf){
             this.localJobConf = lconf;
             keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+            taskTimeout = localJobConf.getLong("mapred.task.timeout", 
+                                                10 * 60 * 1000);
+        }
+        
+        public JobConf getJobConf() {
+          return localJobConf;
         }
         
         /**
@@ -1024,6 +1039,15 @@
             return runstate;
         }
 
+        /**
+         * The task's configured timeout.
+         * 
+         * @return the task's configured timeout.
+         */
+        public long getTaskTimeout() {
+          return taskTimeout;
+        }
+        
         /**
          * The task has reported some diagnostic info about its status
          */



Mime
View raw message