hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r547790 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Fri, 15 Jun 2007 21:29:49 GMT
Author: cutting
Date: Fri Jun 15 14:29:48 2007
New Revision: 547790

URL: http://svn.apache.org/viewvc?view=rev&rev=547790
Log:
HADOOP-1472.  Fix so that timed-out tasks are counted as failures rather than as killed. 
Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547790&r1=547789&r2=547790
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 15 14:29:48 2007
@@ -130,6 +130,9 @@
  41. HADOOP-1457.  Add counters for monitoring task assignments.
      (Arun C Murthy via tomwhite)
 
+ 42. HADOOP-1472.  Fix so that timed-out tasks are counted as failures
+     rather than as killed.  (Arun C Murthy via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=547790&r1=547789&r2=547790
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jun 15 14:29:48
2007
@@ -215,7 +215,7 @@
                 }
                 LOG.info("Received KillTaskAction for task: " + 
                          killAction.getTaskId());
-                purgeTask(tip);
+                purgeTask(tip, false);
               } else {
                 LOG.error("Non-delete action given to cleanup thread: "
                           + action);
@@ -623,7 +623,7 @@
       new TreeMap<String, TaskInProgress>();
     tasksToClose.putAll(tasks);
     for (TaskInProgress tip : tasksToClose.values()) {
-      tip.jobHasFinished();
+      tip.jobHasFinished(false);
     }
 
     // Shutdown local RPC servers.  Do them
@@ -920,13 +920,13 @@
         // time-period greater than the configured time-out
         long timeSinceLastReport = now - tip.getLastProgressReport();
         if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
-          String msg = "Task failed to report status for " +
-            (timeSinceLastReport / 1000) + 
-            " seconds. Killing.";
+          String msg = 
+            "Task " + tip.getTask().getTaskId() + " failed to report status for " 
+            + (timeSinceLastReport / 1000) + " seconds. Killing!";
           LOG.info(tip.getTask().getTaskId() + ": " + msg);
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
-          purgeTask(tip);
+          purgeTask(tip, true);
         }
       }
     }
@@ -951,7 +951,7 @@
       synchronized (rjob) {            
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
-          tip.jobHasFinished();
+          tip.jobHasFinished(false);
         }
         // Delete the job directory for this  
         // task if the job is done/failed
@@ -974,17 +974,17 @@
    * Remove the tip and update all relevant state.
    * 
    * @param tip {@link TaskInProgress} to be removed.
-   * @param purgeJobFiles <code>true</code> if the job files are to be
-   *                      purged, <code>false</code> otherwise.
+   * @param wasFailure did the task fail or was it killed?
    */
-  private void purgeTask(TaskInProgress tip) throws IOException {
+  private void purgeTask(TaskInProgress tip, boolean wasFailure) 
+  throws IOException {
     if (tip != null) {
       LOG.info("About to purge task: " + tip.getTask().getTaskId());
         
       // Remove the task from running jobs, 
       // removing the job if it's the last task
       removeTaskFromJob(tip.getTask().getJobId(), tip);
-      tip.jobHasFinished();
+      tip.jobHasFinished(wasFailure);
     }
   }
 
@@ -1008,7 +1008,7 @@
             " Killing task.";
           LOG.info(killMe.getTask().getTaskId() + ": " + msg);
           killMe.reportDiagnosticInfo(msg);
-          purgeTask(killMe);
+          purgeTask(killMe, false);
         }
       }
     }
@@ -1105,7 +1105,7 @@
       LOG.warn(msg);
       tip.reportDiagnosticInfo(msg);
       try {
-        tip.killAndCleanup(true);
+        tip.kill(true);
       } catch (IOException ie2) {
         LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +
                  StringUtils.stringifyException(ie2));          
@@ -1187,7 +1187,6 @@
     private boolean keepFailedTaskFiles;
     private boolean alwaysKeepTaskFiles;
     private TaskStatus taskStatus; 
-    private boolean keepJobFiles;
     private long taskTimeout;
         
     /**
@@ -1207,7 +1206,6 @@
                                   getName(), task.isMapTask()? TaskStatus.Phase.MAP:
                                   TaskStatus.Phase.SHUFFLE,
                                   task.getCounters()); 
-      keepJobFiles = false;
       taskTimeout = (10 * 60 * 1000);
     }
         
@@ -1236,7 +1234,6 @@
       task.setConf(localJobConf);
       String keepPattern = localJobConf.getKeepTaskFilesPattern();
       if (keepPattern != null) {
-        keepJobFiles = true;
         alwaysKeepTaskFiles = 
           Pattern.matches(keepPattern, task.getTaskId());
       } else {
@@ -1408,50 +1405,36 @@
 
     /**
      * We no longer need anything from this task, as the job has
-     * finished.  If the task is still running, kill it (and clean up
+     * finished.  If the task is still running, kill it and clean up.
+     * 
+     * @param wasFailure did the task fail, as opposed to was it killed by
+     *                   the framework
      */
-    public void jobHasFinished() throws IOException {
-      boolean killTask = false;  
+    public void jobHasFinished(boolean wasFailure) throws IOException {
+      // Kill the task if it is still running
       synchronized(this){
-        killTask = (getRunState() == TaskStatus.State.RUNNING);
-        if (killTask) {
-          killAndCleanup(false);
-        }
-      }
-      if (!killTask) {
-        cleanup();
-      }
-      if (keepJobFiles)
-        return;
-              
-      synchronized(this){
-        // Delete temp directory in case any task used PhasedFileSystem.
-        try{
-          String systemDir = task.getConf().get("mapred.system.dir");
-          Path taskTempDir = new Path(systemDir + "/" + 
-                                      task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId());
-          if (fs.exists(taskTempDir)){
-            fs.delete(taskTempDir);
-          }
-        }catch(IOException e){
-          LOG.warn("Error in deleting reduce temporary output", e); 
+        if (getRunState() == TaskStatus.State.RUNNING) {
+          kill(wasFailure);
         }
       }
+      
+      // Cleanup on the finished task
+      cleanup();
     }
 
     /**
      * Something went wrong and the task must be killed.
      * @param wasFailure was it a failure (versus a kill request)?
      */
-    public synchronized void killAndCleanup(boolean wasFailure
-                                            ) throws IOException {
+    public synchronized void kill(boolean wasFailure) throws IOException {
       if (runstate == TaskStatus.State.RUNNING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
         runner.kill();
-        runstate = TaskStatus.State.KILLED;
+        runstate = 
+          (wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED;
       } else if (runstate == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;
@@ -1596,7 +1579,7 @@
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
     tip.reportDiagnosticInfo("FSError: " + message);
-    purgeTask(tip);
+    purgeTask(tip, true);
   }
 
   public TaskCompletionEvent[] getMapCompletionEvents(



Mime
View raw message