hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1377047 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: CHANGES.txt.MR-3902 hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
Date Fri, 24 Aug 2012 18:56:10 GMT
Author: sseth
Date: Fri Aug 24 18:56:09 2012
New Revision: 1377047

URL: http://svn.apache.org/viewvc?rev=1377047&view=rev
Log:
MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase. (Contributed by Tsuyoshi
OZAWA)

Added:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1377047&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 24
18:56:09 2012
@@ -0,0 +1,2 @@
+Branch MR-3902
+  MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase (Tsuyoshi OZAWA
via sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java?rev=1377047&r1=1377046&r2=1377047&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
Fri Aug 24 18:56:09 2012
@@ -18,22 +18,14 @@
 
 package org.apache.hadoop.mapreduce.v2.app2;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.HeartbeatHandlerBase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.AbstractService;
 
 
 /**
@@ -42,153 +34,36 @@ import org.apache.hadoop.yarn.service.Ab
  * not hear from it for a long time.
  * 
  */
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class TaskHeartbeatHandler extends AbstractService {
-  
-  // TODO XXX: Extend HeartbeatHandlerBase
-  
-  private static class ReportTime {
-    private long lastPing;
-    private long lastProgress;
-    
-    public ReportTime(long time) {
-      setLastProgress(time);
-    }
-    
-    public synchronized void setLastPing(long time) {
-      lastPing = time;
-    }
-    
-    public synchronized void setLastProgress(long time) {
-      lastProgress = time;
-      lastPing = time;
-    }
-    
-    public synchronized long getLastPing() {
-      return lastPing;
-    }
-    
-    public synchronized long getLastProgress() {
-      return lastProgress;
-    }
-  }
-  
-  private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
-  private static final int PING_TIMEOUT = 5 * 60 * 1000;
-  
-  //thread which runs periodically to see the last time since a heartbeat is
-  //received from a task.
-  private Thread lostTaskCheckerThread;
-  private volatile boolean stopped;
-  private int taskTimeOut = 5 * 60 * 1000;// 5 mins
-  private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
-
-  private final EventHandler eventHandler;
-  private final Clock clock;
-  private final AppContext context;
-  
-  private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
+@SuppressWarnings({"unchecked"})
+public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TaskAttemptId> {
 
-  public TaskHeartbeatHandler(AppContext context) {
-    this(context, 16);
-  }
-  
-  public TaskHeartbeatHandler(AppContext context,
-      int numThreads) {
-    super("TaskHeartbeatHandler");
-    this.eventHandler = context.getEventHandler();
-    this.clock = context.getClock();
-    this.context = context;
-    numThreads = numThreads <= 0 ? 1 : numThreads;
-    runningAttempts =
-      new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
+  public TaskHeartbeatHandler(AppContext context, int numThreads) {
+    super(context, numThreads, "TaskHeartbeatHandler");
   }
 
   @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-    taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
-    taskTimeOutCheckInterval =
-        conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+  protected int getConfiguredTimeout(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
   }
 
   @Override
-  public void start() {
-    lostTaskCheckerThread = new Thread(new PingChecker());
-    lostTaskCheckerThread.setName("TaskHeartbeatHandler PingChecker");
-    lostTaskCheckerThread.start();
-    super.start();
+  protected int getConfiguredTimeoutCheckInterval(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
   }
 
   @Override
-  public void stop() {
-    stopped = true;
-    lostTaskCheckerThread.interrupt();
-    super.stop();
-  }
-
-  public void progressing(TaskAttemptId attemptID) {
-  //only put for the registered attempts
-    //TODO throw an exception if the task isn't registered.
-    ReportTime time = runningAttempts.get(attemptID);
-    if(time != null) {
-      time.setLastProgress(clock.getTime());
-    }
-  }
-
-  public void pinged(TaskAttemptId attemptID) {
-    //only put for the registered attempts
-      //TODO throw an exception if the task isn't registered.
-      ReportTime time = runningAttempts.get(attemptID);
-      if(time != null) {
-        time.setLastPing(clock.getTime());
-      }
-    }
-  
-  public void register(TaskAttemptId attemptID) {
-    runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
+  public boolean hasTimedOut(
+      org.apache.hadoop.mapreduce.jobhistory.HeartbeatHandlerBase.ReportTime report,
+      long currentTime) {
+    return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
   }
 
-  public void unregister(TaskAttemptId attemptID) {
-    runningAttempts.remove(attemptID);
-  }
-
-  private class PingChecker implements Runnable {
-
-    @Override
-    public void run() {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
-            runningAttempts.entrySet().iterator();
-
-        // avoid calculating current time everytime in loop
-        long currentTime = clock.getTime();
-
-        while (iterator.hasNext()) {
-          Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
-          boolean taskTimedOut = (taskTimeOut > 0) && 
-              (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
-          boolean pingTimedOut =
-              (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
-              
-          if(taskTimedOut || pingTimedOut) {
-            // task is lost, remove from the list and raise lost event
-            iterator.remove();
-            eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
-                .getKey(), "AttemptID:" + entry.getKey().toString()
-                + " Timed out after " + taskTimeOut / 1000 + " secs"));
-            eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
-                TaskAttemptEventType.TA_TIMED_OUT));
-          }
-        }
-        try {
-          Thread.sleep(taskTimeOutCheckInterval);
-        } catch (InterruptedException e) {
-          LOG.info("TaskHeartbeatHandler thread interrupted");
-          break;
-        }
-      }
-    }
+  @Override
+  public void handleTimeOut(TaskAttemptId attemptId) {
+    eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+        "AttemptID:" + attemptId.toString()
+        + " Timed out after " + timeOut / 1000 + " secs"));
+    eventHandler.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TIMED_OUT));
   }
-
-}
+}
\ No newline at end of file



Mime
View raw message