hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r492677 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/Task.java
Date Thu, 04 Jan 2007 18:49:28 GMT
Author: cutting
Date: Thu Jan  4 10:49:27 2007
New Revision: 492677

URL: http://svn.apache.org/viewvc?view=rev&rev=492677
Log:
HADOOP-846.  Report progress during entire map.  Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=492677&r1=492676&r2=492677
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan  4 10:49:27 2007
@@ -181,6 +181,10 @@
     distributions.  Also add contrib and example documentation to
     distributed javadoc, in separate sections.  (Nigel Daley via cutting)
 
+52. HADOOP-846.  Report progress during entire map, as sorting of
+    intermediate outputs may happen at any time, potentially causing
+    task timeouts.  (Devaraj Das via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=492677&r1=492676&r2=492677
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Jan  4 10:49:27
2007
@@ -161,7 +161,7 @@
         public synchronized boolean next(Writable key, Writable value)
           throws IOException {
 
-          reportProgress(umbilical, getProgress());
+          setProgress(getProgress());
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           myMetrics.mapInput(getPos() - beforePos);
@@ -174,13 +174,13 @@
         }
       };
 
+    Thread sortProgress = createProgressThread(umbilical);
     MapRunnable runner =
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     try {
+      sortProgress.start();
       runner.run(in, collector, reporter);      // run the map
-    } finally {
-      in.close();                               // close input
       //check whether the length of the key/value buffer is 0. If not, then
       //we need to spill that to disk. Note that we reset the key/val buffer
       //upon each spill (so a length > 0 means that we have not spilled yet)
@@ -189,12 +189,40 @@
       }
       //merge the partitions from the spilled files and create one output
       collector.mergeParts();
+    } finally {
       //close
+      in.close();                               // close input
       collector.close();
+      sortProgress.interrupt();
     }
     done(umbilical);
   }
 
+  private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
+    //spawn a thread to give merge progress heartbeats
+    Thread sortProgress = new Thread() {
+      public void run() {
+        LOG.info("Started thread: " + getName());
+        while (true) {
+          try {
+            reportProgress(umbilical);
+            Thread.sleep(PROGRESS_INTERVAL);
+          } catch (InterruptedException e) {
+              return;
+          } catch (Throwable e) {
+              LOG.info("Thread Exception in " +
+                                 "reporting sort progress\n" +
+                                 StringUtils.stringifyException(e));
+              continue;
+          }
+        }
+      }
+    };
+    sortProgress.setName("Sort progress reporter for task "+getTaskId());
+    sortProgress.setDaemon(true);
+    return sortProgress;
+  }
+
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;
@@ -298,7 +326,6 @@
         int partNumber = partitioner.getPartition(key, value, partitions);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
-        reportProgress(umbilical); 
         myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
 
         //now check whether we need to spill to disk
@@ -348,7 +375,6 @@
                   throws IOException {
                   synchronized (this) {
                     writer.append(key, value);
-                    reportProgress(umbilical);
                   }
                 }
               };
@@ -374,7 +400,6 @@
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
-        reportProgress(umbilical);
       }
     }
     
@@ -402,7 +427,6 @@
         value.readFields(valIn);
 
         writer.append(key, value);
-        reportProgress(umbilical);
       }
     }
     
@@ -435,34 +459,12 @@
                   compressionType, codec);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
-          reportProgress(umbilical);
         }
         finalOut.close();
         finalIndexOut.close();
         return;
       }
-      //spawn a thread to give merge progress heartbeats
-      Thread sortProgress = new Thread() {
-        public void run() {
-          while (true) {
-            try {
-              reportProgress(umbilical);
-              Thread.sleep(PROGRESS_INTERVAL);
-            } catch (InterruptedException e) {
-                return;
-            } catch (Throwable e) {
-                LOG.info("Thread Exception in " +
-                                   "reporting sort progress\n" +
-                                   StringUtils.stringifyException(e));
-                continue;
-            }
-          }
-        }
-      };
-      sortProgress.setName("Sort progress reporter for task "+getTaskId());
-      sortProgress.setDaemon(true);
-      sortProgress.start();
-      try {
+      {
         Path [] filename = new Path[numSpills];
         Path [] indexFileName = new Path[numSpills];
         FSDataInputStream in[] = new FSDataInputStream[numSpills];
@@ -514,8 +516,6 @@
           in[i].close(); localFs.delete(filename[i]);
           indexIn[i].close(); localFs.delete(indexFileName[i]);
         }
-      } finally {
-        sortProgress.interrupt();
       }
     }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=492677&r1=492676&r2=492677
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Jan  4 10:49:27 2007
@@ -142,7 +142,7 @@
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 1000;
 
-  private transient Progress taskProgress = new Progress();
+  private volatile Progress taskProgress = new Progress();
   private transient long nextProgressTime =
     System.currentTimeMillis() + PROGRESS_INTERVAL;
 
@@ -165,9 +165,13 @@
       };
   }
 
+  public void setProgress(float progress) {
+    taskProgress.set(progress);
+  }
+
   public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
     throws IOException {
-    taskProgress.set(progress);
+    setProgress(progress);
     reportProgress(umbilical);
   }
 



Mime
View raw message