hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r795721 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Mon, 20 Jul 2009 08:52:43 GMT
Author: sharad
Date: Mon Jul 20 08:52:42 2009
New Revision: 795721

URL: http://svn.apache.org/viewvc?rev=795721&view=rev
Log:
MAPREDUCE-430. Fix bug related to Task getting stuck due to OutOfMemoryErrors. Contributed
by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 20 08:52:42 2009
@@ -233,3 +233,6 @@
     MAPREDUCE-771. Fix scheduling of setup and cleanup tasks to use
     free slots instead of tasks for scheduling. (yhemanth)
 
+    MAPREDUCE-430. Fix bug related to Task getting stuck due to
+    OutOfMemoryErrors. (Amar Kamat via sharad)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Mon Jul 20 08:52:42
2009
@@ -164,9 +164,13 @@
           break;
         }
       }
-    } catch (FSError e) {
-      LOG.fatal("FSError from child", e);
-      umbilical.fsError(taskid, e.getMessage());
+    } catch (Error e) {
+      String error = "Error";
+      if (e instanceof FSError) {
+       error = "FSError";
+      }
+      LOG.fatal(error + " from child", e);
+      umbilical.taskError(taskid, e.getMessage());
     } catch (Throwable throwable) {
       LOG.warn("Error running child", throwable);
       try {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Mon Jul
20 08:52:42 2009
@@ -61,12 +61,9 @@
       LOG.info("Task " + taskid + " reporting done.");
     }
 
-    public void fsError(TaskAttemptID taskId, String message) throws IOException {
-      LOG.info("Task " + taskId + " reporting file system error: " + message);
-    }
-
-    public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
-      LOG.info("Task " + taskId + " reporting shuffle error: " + message);
+    public void taskError(TaskAttemptID taskId, String message) 
+    throws IOException {
+      LOG.info("Task " + taskId + " reporting task error: " + message);
     }
 
     public JvmTask getTask(JvmContext context) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Jul 20
08:52:42 2009
@@ -336,13 +336,9 @@
       }
     }
 
-    public synchronized void fsError(TaskAttemptID taskId, String message) 
+    public void taskError(TaskAttemptID taskId, String message) 
     throws IOException {
-      LOG.fatal("FSError: "+ message + "from task: " + taskId);
-    }
-
-    public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
-      LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+      LOG.fatal("Error: "+ message + "from task: " + taskId);
     }
     
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Jul 20 08:52:42
2009
@@ -375,8 +375,8 @@
         if(reduceCopier.mergeThrowable instanceof FSError) {
           LOG.error("Task: " + getTaskID() + " - FSError: " + 
               StringUtils.stringifyException(reduceCopier.mergeThrowable));
-          umbilical.fsError(getTaskID(), 
-              reduceCopier.mergeThrowable.getMessage());
+          umbilical.taskError(getTaskID(), 
+              "(FSError) " + reduceCopier.mergeThrowable.getMessage());
         }
         throw new IOException("Task: " + getTaskID() + 
             " - The reduce copier failed", reduceCopier.mergeThrowable);
@@ -1249,7 +1249,8 @@
             LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + 
                       StringUtils.stringifyException(e));
             try {
-              umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
+              umbilical.taskError(reduceTask.getTaskID(), "(FSError) " 
+                                  + e.getMessage());
             } catch (IOException io) {
               LOG.error("Could not notify TT of FSError: " + 
                       StringUtils.stringifyException(io));
@@ -2163,9 +2164,9 @@
                   LOG.fatal("Shuffle failed with too many fetch failures " + 
                             "and insufficient progress!" +
                             "Killing task " + getTaskID() + ".");
-                  umbilical.shuffleError(getTaskID(), 
-                                         "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
-                                         + " bailing-out.");
+                  umbilical.taskError(getTaskID(), "(Shuffle Error) " 
+                                      + "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+                                      + " bailing-out.");
                 }
               }
                 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Jul 20 08:52:42
2009
@@ -347,15 +347,19 @@
               exitCode + ".");
         }
       }
-    } catch (FSError e) {
-      LOG.fatal("FSError", e);
+    } catch (Error e) {
+      String error = "Error";
+      if (e instanceof FSError) {
+        error = "FSError";
+      }
+      LOG.fatal(error, e);
       try {
-        tracker.fsError(t.getTaskID(), e.getMessage());
+        tracker.taskError(t.getTaskID(), e.getMessage());
       } catch (IOException ie) {
-        LOG.fatal(t.getTaskID()+" reporting FSError", ie);
+        LOG.fatal(t.getTaskID()+" reporting " + error, ie);
       }
     } catch (Throwable throwable) {
-      LOG.warn(t.getTaskID() + errorInfo, throwable);
+      LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
       Throwable causeThrowable = new Throwable(errorInfo, throwable);
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       causeThrowable.printStackTrace(new PrintStream(baos));

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Jul 20 08:52:42
2009
@@ -2622,24 +2622,13 @@
 
 
   /** 
-   * A reduce-task failed to shuffle the map-outputs. Kill the task.
-   */  
-  public synchronized void shuffleError(TaskAttemptID taskId, String message) 
-  throws IOException { 
-    LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
-    TaskInProgress tip = runningTasks.get(taskId);
-    tip.reportDiagnosticInfo("Shuffle Error: " + message);
-    purgeTask(tip, true);
-  }
-
-  /** 
    * A child task had a local filesystem error. Kill the task.
    */  
-  public synchronized void fsError(TaskAttemptID taskId, String message) 
+  public synchronized void taskError(TaskAttemptID taskId, String message) 
   throws IOException {
-    LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
+    LOG.fatal("Task: " + taskId + " - Killed due to : " + message);
     TaskInProgress tip = runningTasks.get(taskId);
-    tip.reportDiagnosticInfo("FSError: " + message);
+    tip.reportDiagnosticInfo(message);
     purgeTask(tip, true);
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Mon
Jul 20 08:52:42 2009
@@ -56,9 +56,10 @@
    * Version 16 Change in signature of getTask() for HADOOP-5488
    * Version 17 Modified TaskID to be aware of the new TaskTypes
    * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+   * Version 19 Removed fsError and shuffleError and introduced taskError.
    * */
 
-  public static final long versionID = 18L;
+  public static final long versionID = 19L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -126,11 +127,8 @@
    */
   boolean canCommit(TaskAttemptID taskid) throws IOException;
 
-  /** Report that a reduce-task couldn't shuffle map-outputs.*/
-  void shuffleError(TaskAttemptID taskId, String message) throws IOException;
-  
-  /** Report that the task encounted a local filesystem error.*/
-  void fsError(TaskAttemptID taskId, String message) throws IOException;
+  /** Report that the task encountered an error.*/
+  void taskError(TaskAttemptID taskId, String message) throws IOException;
 
   /** Called by a reduce task to get the map output locations for finished maps.
    * Returns an update centered around the map-task-completion-events. 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=795721&r1=795720&r2=795721&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Mon
Jul 20 08:52:42 2009
@@ -50,7 +50,9 @@
         throw new IOException();
       } else if (taskid.endsWith("_1")) {
         System.exit(-1);
-      } 
+      } else if (taskid.endsWith("_2")) {
+        throw new OutOfMemoryError();
+      }
     }
   }
 
@@ -107,46 +109,55 @@
     return new JobClient(conf).submitJob(conf);
   }
   
+  private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
+                               TaskStatus ts, boolean isCleanup) 
+  throws IOException {
+    assertEquals(tip.isCleanupAttempt(attemptId), isCleanup);
+    assertTrue(ts != null);
+    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+    // validate tasklogs for task attempt
+    String log = TestMiniMRMapRedDebugScript.readTaskLog(
+                      TaskLog.LogName.STDERR, attemptId, false);
+    assertTrue(log.contains(taskLog));
+    if (!isCleanup) {
+      // validate task logs: tasklog should contain both task logs
+      // and cleanup logs
+      assertTrue(log.contains(cleanupLog));
+    } else {
+      // validate tasklogs for cleanup attempt
+      log = TestMiniMRMapRedDebugScript.readTaskLog(
+                 TaskLog.LogName.STDERR, attemptId, true);
+      assertTrue(log.contains(cleanupLog));
+    }
+  }
+
   private void validateJob(RunningJob job, MiniMRCluster mr) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
 	    
     JobID jobId = job.getID();
     // construct the task id of first map task
+    // this should not be cleanup attempt since the first attempt 
+    // fails with an exception
     TaskAttemptID attemptId = 
       new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
                             getTip(attemptId.getTaskID());
-    // this should not be cleanup attempt since the first attempt 
-    // fails with an exception
-    assertTrue(!tip.isCleanupAttempt(attemptId));
     TaskStatus ts = 
       mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    assertTrue(ts != null);
-    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-    // validate task logs: tasklog should contain both task logs
-    // and cleanup logs
-    String log = TestMiniMRMapRedDebugScript.readTaskLog(
-                      TaskLog.LogName.STDERR, attemptId, false);
-    assertTrue(log.contains(taskLog));
-    assertTrue(log.contains(cleanupLog));
+    validateAttempt(tip, attemptId, ts, false);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
     // this should be cleanup attempt since the second attempt fails
     // with System.exit
-    assertTrue(tip.isCleanupAttempt(attemptId));
     ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    assertTrue(ts != null);
-    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-    // validate tasklogs for task attempt
-    log = TestMiniMRMapRedDebugScript.readTaskLog(
-               TaskLog.LogName.STDERR, attemptId, false);
-    assertTrue(log.contains(taskLog));
-
-    // validate tasklogs for cleanup attempt
-    log = TestMiniMRMapRedDebugScript.readTaskLog(
-               TaskLog.LogName.STDERR, attemptId, true);
-    assertTrue(log.contains(cleanupLog));
+    validateAttempt(tip, attemptId, ts, true);
+    
+    attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
+    // this should be cleanup attempt since the third attempt fails
+    // with OutOfMemory
+    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true);
   }
   
   public void testWithDFS() throws IOException {
@@ -191,3 +202,4 @@
     td.testWithDFS();
   }
 }
+



Mime
View raw message