hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject svn commit: r1308537 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org...
Date Mon, 02 Apr 2012 20:30:53 GMT
Author: tgraves
Date: Mon Apr  2 20:30:53 2012
New Revision: 1308537

URL: http://svn.apache.org/viewvc?rev=1308537&view=rev
Log:
merge -r 1308532:1308533 from branch-2. FIXES: MAPREDUCE-4089

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
      - copied unchanged from r1308533, hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Apr  2 20:30:53
2012
@@ -13,7 +13,9 @@ Release 0.23.3 - UNRELEASED
   BUG FIXES
 
     MAPREDUCE-4092.  commitJob Exception does not fail job (Jon Eagles via
-     bobby)
+    bobby)
+
+    MAPREDUCE-4089. Hung Tasks never time out. (Robert Evans via tgraves)
 
 Release 0.23.2 - UNRELEASED
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
Mon Apr  2 20:30:53 2012
@@ -175,7 +175,7 @@ public class TaskAttemptListenerImpl ext
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
 
-    taskHeartbeatHandler.receivedPing(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
 
     Job job = context.getJob(attemptID.getTaskId().getJobId());
     Task task = job.getTask(attemptID.getTaskId());
@@ -203,7 +203,7 @@ public class TaskAttemptListenerImpl ext
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
 
-    taskHeartbeatHandler.receivedPing(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
     //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, 
@@ -217,7 +217,7 @@ public class TaskAttemptListenerImpl ext
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
 
-    taskHeartbeatHandler.receivedPing(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
 
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@@ -270,7 +270,7 @@ public class TaskAttemptListenerImpl ext
         context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
             fromEventId, maxEvents);
 
-    taskHeartbeatHandler.receivedPing(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
 
     // filter the events to return only map completion events in old format
     List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
@@ -287,7 +287,7 @@ public class TaskAttemptListenerImpl ext
   @Override
   public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
     LOG.info("Ping from " + taskAttemptID.toString());
-    taskHeartbeatHandler.receivedPing(TypeConverter.toYarn(taskAttemptID));
+    taskHeartbeatHandler.pinged(TypeConverter.toYarn(taskAttemptID));
     return true;
   }
 
@@ -299,7 +299,7 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
       TypeConverter.toYarn(taskAttemptID);
-    taskHeartbeatHandler.receivedPing(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
 
     // This is mainly used for cases where we want to propagate exception traces
     // of tasks that fail.
@@ -317,7 +317,7 @@ public class TaskAttemptListenerImpl ext
     LOG.info("Status update from " + taskAttemptID.toString());
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
         TypeConverter.toYarn(taskAttemptID);
-    taskHeartbeatHandler.receivedPing(yarnAttemptID);
+    taskHeartbeatHandler.progressing(yarnAttemptID);
     TaskAttemptStatus taskAttemptStatus =
         new TaskAttemptStatus();
     taskAttemptStatus.id = yarnAttemptID;

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
Mon Apr  2 20:30:53 2012
@@ -44,9 +44,36 @@ import org.apache.hadoop.yarn.service.Ab
  */
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TaskHeartbeatHandler extends AbstractService {
-
+  
+  private static class ReportTime {
+    private long lastPing;
+    private long lastProgress;
+    
+    public ReportTime(long time) {
+      setLastProgress(time);
+    }
+    
+    public synchronized void setLastPing(long time) {
+      lastPing = time;
+    }
+    
+    public synchronized void setLastProgress(long time) {
+      lastProgress = time;
+      lastPing = time;
+    }
+    
+    public synchronized long getLastPing() {
+      return lastPing;
+    }
+    
+    public synchronized long getLastProgress() {
+      return lastProgress;
+    }
+  }
+  
   private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
-
+  private static final int PING_TIMEOUT = 5 * 60 * 1000;
+  
   //thread which runs periodically to see the last time since a heartbeat is
   //received from a task.
   private Thread lostTaskCheckerThread;
@@ -56,8 +83,8 @@ public class TaskHeartbeatHandler extend
 
   private final EventHandler eventHandler;
   private final Clock clock;
-
-  private ConcurrentMap<TaskAttemptId, Long> runningAttempts;
+  
+  private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
 
   public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
       int numThreads) {
@@ -65,7 +92,7 @@ public class TaskHeartbeatHandler extend
     this.eventHandler = eventHandler;
     this.clock = clock;
     runningAttempts =
-      new ConcurrentHashMap<TaskAttemptId, Long>(16, 0.75f, numThreads);
+      new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
   }
 
   @Override
@@ -91,14 +118,26 @@ public class TaskHeartbeatHandler extend
     super.stop();
   }
 
-  public void receivedPing(TaskAttemptId attemptID) {
+  public void progressing(TaskAttemptId attemptID) {
   //only put for the registered attempts
     //TODO throw an exception if the task isn't registered.
-    runningAttempts.replace(attemptID, clock.getTime());
+    ReportTime time = runningAttempts.get(attemptID);
+    if(time != null) {
+      time.setLastProgress(clock.getTime());
+    }
   }
 
+  public void pinged(TaskAttemptId attemptID) {
+    //only put for the registered attempts
+      //TODO throw an exception if the task isn't registered.
+      ReportTime time = runningAttempts.get(attemptID);
+      if(time != null) {
+        time.setLastPing(clock.getTime());
+      }
+    }
+  
   public void register(TaskAttemptId attemptID) {
-    runningAttempts.put(attemptID, clock.getTime());
+    runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
   }
 
   public void unregister(TaskAttemptId attemptID) {
@@ -110,30 +149,27 @@ public class TaskHeartbeatHandler extend
     @Override
     public void run() {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
-        Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
+        Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
             runningAttempts.entrySet().iterator();
 
         // avoid calculating current time everytime in loop
         long currentTime = clock.getTime();
 
         while (iterator.hasNext()) {
-          Map.Entry<TaskAttemptId, Long> entry = iterator.next();
-          if (currentTime > entry.getValue() + taskTimeOut) {
-
-            //In case the iterator isn't picking up the latest.
-            // Extra lookup outside of the iterator - but only if the task
-            // is considered to be timed out.
-            Long taskTime = runningAttempts.get(entry.getKey());
-            if (taskTime != null && currentTime > taskTime + taskTimeOut) {
-              // task is lost, remove from the list and raise lost event
-              iterator.remove();
-              eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
-                  .getKey(), "AttemptID:" + entry.getKey().toString()
-                  + " Timed out after " + taskTimeOut / 1000 + " secs"));
-              eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
-                  TaskAttemptEventType.TA_TIMED_OUT));
-            }
-
+          Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
+          boolean taskTimedOut = (taskTimeOut > 0) && 
+              (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
+          boolean pingTimedOut =
+              (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
+              
+          if(taskTimedOut || pingTimedOut) {
+            // task is lost, remove from the list and raise lost event
+            iterator.remove();
+            eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+                .getKey(), "AttemptID:" + entry.getKey().toString()
+                + " Timed out after " + taskTimeOut / 1000 + " secs"));
+            eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+                TaskAttemptEventType.TA_TIMED_OUT));
           }
         }
         try {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1308537&r1=1308536&r2=1308537&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Mon Apr  2 20:30:53 2012
@@ -351,7 +351,7 @@
   <value>600000</value>
   <description>The number of milliseconds before a task will be
   terminated if it neither reads an input, writes an output, nor
-  updates its status string.
+  updates its status string.  A value of 0 disables the timeout.
   </description>
 </property>
 



Mime
View raw message