hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r563649 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/Task.java src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Tue, 07 Aug 2007 20:41:43 GMT
Author: tomwhite
Date: Tue Aug  7 13:41:43 2007
New Revision: 563649

URL: http://svn.apache.org/viewvc?view=rev&rev=563649
Log:
HADOOP-1651.  Improve progress reporting.  Contributed by Devaraj Das.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=563649&r1=563648&r2=563649
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug  7 13:41:43 2007
@@ -32,6 +32,11 @@
     HADOOP-1463.  HDFS report correct usage statistics for disk space
     used by HDFS.  (Hairong Kuang via dhruba)
 
+  IMPROVEMENTS
+
+    HADOOP-1651.  Improve progress reporting.
+    (Devaraj Das via tomwhite)
+
 Branch 0.14 (unreleased changes)
 
   1. HADOOP-1197.  In Configuration, deprecate getObject() and add

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=563649&r1=563648&r2=563649
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Aug  7 13:41:43 2007
@@ -217,7 +217,7 @@
                                           ) throws IOException;
 
   /** The number of milliseconds between progress reports. */
-  public static final int PROGRESS_INTERVAL = 1000;
+  public static final int PROGRESS_INTERVAL = 3000;
 
   private transient Progress taskProgress = new Progress();
 
@@ -230,6 +230,8 @@
    * Using AtomicBoolean since we need an atomic read & reset method. 
    */  
   private AtomicBoolean progressFlag = new AtomicBoolean(false);
+  /* flag to track whether task is done */
+  private AtomicBoolean taskDone = new AtomicBoolean(false);
   // getters and setters for flag
   private void setProgressFlag() {
     progressFlag.set(true);
@@ -256,11 +258,20 @@
         public void run() {
           final int MAX_RETRIES = 3;
           int remainingRetries = MAX_RETRIES;
-          while (true) {
+          // get current flag value and reset it as well
+          boolean sendProgress = resetProgressFlag();
+          while (!taskDone.get()) {
             try {
-              // get current flag value and reset it as well
-              boolean sendProgress = resetProgressFlag();
               boolean taskFound = true; // whether TT knows about this task
+              // sleep for a bit
+              try {
+                Thread.sleep(PROGRESS_INTERVAL);
+              } 
+              catch (InterruptedException e) {
+                LOG.debug(getTaskId() + " Progress/ping thread exiting " +
+                                        "since it got interrupted");
+                break;
+              }
               
               if (sendProgress) {
                 // we need to send progress update
@@ -279,13 +290,8 @@
                 System.exit(66);
               }
               
+              sendProgress = resetProgressFlag(); 
               remainingRetries = MAX_RETRIES;
-              // sleep for a bit
-              try {
-                Thread.sleep(PROGRESS_INTERVAL);
-              } 
-              catch (InterruptedException e) {
-              }
             } 
             catch (Throwable t) {
               LOG.info("Communication exception: " + StringUtils.stringifyException(t));
@@ -301,6 +307,7 @@
       }, "Comm thread for "+taskId);
     thread.setDaemon(true);
     thread.start();
+    LOG.debug(getTaskId() + " Progress/ping thread started");
   }
 
   
@@ -338,6 +345,7 @@
   public void done(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = 10;
     boolean needProgress = true;
+    taskDone.set(true);
     while (true) {
       try {
         if (needProgress) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=563649&r1=563648&r2=563649
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Aug  7 13:41:43
2007
@@ -1776,6 +1776,8 @@
       JobConf defaultConf = new JobConf();
       int port = Integer.parseInt(args[0]);
       String taskid = args[1];
+      //set a very high idle timeout so that the connection is never closed
+      defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000);
       TaskUmbilicalProtocol umbilical =
         (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
                                             TaskUmbilicalProtocol.versionID,



Mime
View raw message