hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r549233 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/pipes/ src/java/org/apache/hadoop/util/
Date Wed, 20 Jun 2007 21:00:27 GMT
Author: cutting
Date: Wed Jun 20 14:00:25 2007
New Revision: 549233

URL: http://svn.apache.org/viewvc?view=rev&rev=549233
Log:
HADOOP-1462.  Improve task progress reporting.  Contributed by Vivek Ratan.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 20 14:00:25 2007
@@ -204,6 +204,11 @@
      to only terminate Hadoop instances, and not other instances
      started by the same user.  (tomwhite via cutting)
 
+ 63. HADOOP-1462.  Improve task progress reporting.  Progress reports
+     are no longer blocking since i/o is performed in a separate
+     thread.  Reporting during sorting and more is also more
+     consistent.  (Vivek Ratan via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Wed Jun 20 14:00:25
2007
@@ -1908,6 +1908,8 @@
     private Class valClass;
 
     private Configuration conf;
+    
+    private Progressable progressable = null;
 
     /** Sort and merge files containing the named classes. */
     public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {
@@ -1938,6 +1940,11 @@
     /** Get the total amount of buffer memory, in bytes.*/
     public int getMemory() { return memory; }
 
+    /** Set the progressable object in order to report progress. */
+    public void setProgressable(Progressable progressable) {
+      this.progressable = progressable;
+    }
+    
     /** 
      * Perform a file sort from a set of input files into an output file.
      * @param inFiles the files to be sorted
@@ -2000,6 +2007,7 @@
     private int sortPass(boolean deleteInput) throws IOException {
       LOG.debug("running sort pass");
       SortPass sortPass = new SortPass();         // make the SortPass
+      sortPass.setProgressable(progressable);
       mergeSort = new MergeSort(sortPass.new SeqFileComparator());
       try {
         return sortPass.run(deleteInput);         // run it
@@ -2028,6 +2036,8 @@
       private FSDataOutputStream indexOut = null;
       private Path outName;
 
+      private Progressable progressable = null;
+
       public int run(boolean deleteInput) throws IOException {
         int segments = 0;
         int currentFile = 0;
@@ -2098,6 +2108,10 @@
           LOG.debug("flushing segment " + segments);
           rawBuffer = rawKeys.getData();
           sort(count);
+          // indicate we're making progress
+          if (progressable != null) {
+            progressable.progress();
+          }
           flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, 
                 segments==0 && atEof);
           segments++;
@@ -2186,6 +2200,13 @@
                                     keyOffsets[J.get()], keyLengths[J.get()]);
         }
       }
+      
+      /** set the progressable object in order to report progress */
+      public void setProgressable(Progressable progressable)
+      {
+        this.progressable = progressable;
+      }
+      
     } // SequenceFile.Sorter.SortPass
 
     /** The interface to iterate over raw keys/values of SequenceFiles. */
@@ -2225,7 +2246,8 @@
     public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
                                      Path tmpDir) 
       throws IOException {
-      MergeQueue mQueue = new MergeQueue(segments, tmpDir);
+      // pass in object to report progress, if present
+      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
       return mQueue.merge();
     }
 
@@ -2270,7 +2292,7 @@
         a.add(s);
       }
       this.factor = factor;
-      MergeQueue mQueue = new MergeQueue(a, tmpDir);
+      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
       return mQueue.merge();
     }
 
@@ -2299,7 +2321,8 @@
         a.add(s);
       }
       factor = (inNames.length < factor) ? inNames.length : factor;
-      MergeQueue mQueue = new MergeQueue(a, tempDir);
+      // pass in object to report progress, if present
+      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
       return mQueue.merge();
     }
 
@@ -2391,7 +2414,7 @@
       //the contained segments during the merge process & hence don't need 
       //them anymore
       SegmentContainer container = new SegmentContainer(inName, indexIn);
-      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir);
+      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
       return mQueue.merge();
     }
     
@@ -2406,6 +2429,7 @@
       private float progPerByte;
       private Progress mergeProgress = new Progress();
       private Path tmpDir;
+      private Progressable progress = null; //handle to the progress reporting object
       
       //a TreeMap used to store the segments sorted by size (segment offset and
       //segment path name is used to break ties between segments of same sizes)
@@ -2427,16 +2451,22 @@
        * A queue of file segments to merge
        * @param segments the file segments to merge
        * @param tmpDir a relative local directory to save intermediate files in
+       * @param progress the reference to the Progressable object
        */
       public MergeQueue(List <SegmentDescriptor> segments,
-                        Path tmpDir) {
+          Path tmpDir, Progressable progress) {
         int size = segments.size();
         for (int i = 0; i < size; i++) {
           sortedSegmentSizes.put(segments.get(i), null);
         }
         this.tmpDir = tmpDir;
+        this.progress = progress;
       }
       protected boolean lessThan(Object a, Object b) {
+        // indicate we're making progress
+        if (progress != null) {
+          progress.progress();
+        }
         SegmentDescriptor msa = (SegmentDescriptor)a;
         SegmentDescriptor msb = (SegmentDescriptor)b;
         return comparator.compare(msa.getKey().getData(), 0, 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Wed Jun
20 14:00:25 2007
@@ -26,7 +26,7 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
-
+import org.apache.hadoop.util.Progressable;
 
 /** This class implements the sort interface using primitive int arrays as 
  * the data structures (that is why this class is called 'BasicType'SorterBase)
@@ -51,6 +51,8 @@
   //4 for indices into startOffsets array in the
   //pointers array (ignored the partpointers list itself)
   static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
+  //Reference to the Progressable object for sending KeepAlive
+  private Progressable reporter;
 
   //Implementation of methods of the SorterBase interface
   //
@@ -62,6 +64,10 @@
     comparator = conf.getOutputKeyComparator();
   }
   
+  public void setProgressable(Progressable reporter) {
+    this.reporter = reporter;  
+  }
+
   public void addKeyValue(int recordOffset, int keyLength, int valLength) {
     //Add the start offset of the key in the startOffsets array and the
     //length in the keyLengths array.
@@ -92,6 +98,8 @@
   //A compare method that references the keyValBuffer through the indirect
   //pointers
   protected int compare(int i, int j) {
+    // indicate we're making progress
+    reporter.progress();
     return comparator.compare(keyValBuffer.getData(), startOffsets[i],
                               keyLengths[i],
                               keyValBuffer.getData(), startOffsets[j], 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java Wed Jun 20 14:00:25
2007
@@ -19,6 +19,7 @@
 
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.util.Progressable;
 
 /** This class provides a generic sort interface that should be implemented
  * by specific sort algorithms. The use case is the following:
@@ -37,6 +38,11 @@
  */
 interface BufferSorter extends JobConfigurable {
   
+  /** Pass the Progressable object so that sort can call progress while it is sorting
+   * @param reporter the Progressable object reference
+   */
+  public void setProgressable(Progressable reporter);
+    
   /** When a key/value is added at a particular offset in the key/value buffer, 
    * this method is invoked by the user class so that the impl of this sort 
    * interface can update its datastructures. 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Jun 20
14:00:25 2007
@@ -59,7 +59,7 @@
       return true;
     }
 
-    public void progress(String taskid, float progress, String state,
+    public boolean progress(String taskid, float progress, String state,
                          TaskStatus.Phase phase, Counters counters) 
       throws IOException 
     {
@@ -74,6 +74,7 @@
       LOG.info(buf.toString());
       // ignore phase
       // ignore counters
+      return true;
     }
 
     public void reportDiagnosticInfo(String taskid, String trace) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jun 20 14:00:25
2007
@@ -195,7 +195,7 @@
 
     public Task getTask(String taskid) { return null; }
 
-    public void progress(String taskId, float progress, String state, 
+    public boolean progress(String taskId, float progress, String state, 
                          TaskStatus.Phase phase, Counters taskCounters) {
       LOG.info(state);
       float taskIndex = mapIds.indexOf(taskId);
@@ -208,6 +208,8 @@
       currentCounters = Counters.sum(completedTaskCounters, taskCounters);
       
       // ignore phase
+      
+      return true;
     }
     
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Jun 20
14:00:25 2007
@@ -191,7 +191,8 @@
                       Path localFilename, 
                       LocalDirAllocator lDirAlloc,
                       Configuration conf, int reduce,
-                      int timeout) throws IOException, InterruptedException {
+                      int timeout, Progressable progressable) 
+  throws IOException, InterruptedException {
     boolean good = false;
     long totalBytes = 0;
     FileSystem fileSys = localFileSys;
@@ -245,6 +246,8 @@
             if (currentThread.isInterrupted()) {
               throw new InterruptedException();
             }
+            // indicate we're making progress
+            progressable.progress();
             len = input.read(buffer);
           }
         } finally {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jun 20 14:00:25
2007
@@ -117,6 +117,9 @@
 
     final Reporter reporter = getReporter(umbilical);
 
+    // start thread that will handle communication with parent
+    startCommunicationThread(umbilical);
+
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);
     MapOutputCollector collector = null;
@@ -164,7 +167,6 @@
           throws IOException {
 
           setProgress(getProgress());
-          reportProgress(umbilical);
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           if (ret) {
@@ -194,32 +196,6 @@
     done(umbilical);
   }
 
-  private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
-    //spawn a thread to give merge progress heartbeats
-    Thread sortProgress = new Thread() {
-        public void run() {
-          LOG.debug("Started thread: " + getName());
-          while (true) {
-            try {
-              reportProgress(umbilical);
-              Thread.sleep(PROGRESS_INTERVAL);
-            } catch (InterruptedException e) {
-              return;
-            } catch (Throwable e) {
-              LOG.info("Thread Exception in " +
-                       "reporting sort progress\n" +
-                       StringUtils.stringifyException(e));
-              continue;
-            }
-          }
-        }
-      };
-    sortProgress.setName("Sort progress reporter for task "+getTaskId());
-    sortProgress.setDaemon(true);
-    sortProgress.start();
-    return sortProgress;
-  }
-  
   interface MapOutputCollector extends OutputCollector {
 
     public void close() throws IOException;
@@ -376,19 +352,10 @@
         for (int i = 0; i < partitions; i++)
           totalMem += sortImpl[i].getMemoryUtilized();
         if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
-
-          // Start the progress thread
-          Thread progress = createProgressThread(umbilical);
-
-          try {
-            sortAndSpillToDisk();
-            keyValBuffer.reset();
-            for (int i = 0; i < partitions; i++) {
-              sortImpl[i].close();
-            }
-          } finally {
-            // Stop the progress thread
-            progress.interrupt();
+          sortAndSpillToDisk();
+          keyValBuffer.reset();
+          for (int i = 0; i < partitions; i++) {
+            sortImpl[i].close();
           }
         }
       }
@@ -414,6 +381,7 @@
         //invoke the sort
         for (int i = 0; i < partitions; i++) {
           sortImpl[i].setInputBuffer(keyValBuffer);
+          sortImpl[i].setProgressable(reporter);
           RawKeyValueIterator rIter = sortImpl[i].sort();
           
           startPartition(i);
@@ -459,6 +427,8 @@
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
         reporter.incrCounter(COMBINE_OUTPUT_RECORDS, 1);
+        // indicate we're making progress
+        reporter.progress();
       }
     }
     
@@ -467,6 +437,9 @@
       Writable value = null;
 
       try {
+        // indicate progress, since constructor may take a while (because of 
+        // user code) 
+        reporter.progress();
         key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
         value = (Writable)ReflectionUtils.newInstance(valClass, job);
       } catch (Exception e) {
@@ -484,8 +457,8 @@
         (resultIter.getValue()).writeUncompressedBytes(valOut);
         valIn.reset(valOut.getData(), valOut.getLength());
         value.readFields(valIn);
-
         writer.append(key, value);
+        reporter.progress();
       }
     }
     
@@ -546,6 +519,7 @@
         //create a sorter object as we need access to the SegmentDescriptor
         //class and merge methods
         Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+        sorter.setProgressable(reporter);
         
         for (int parts = 0; parts < partitions; parts++){
           List<SegmentDescriptor> segmentList =
@@ -608,23 +582,15 @@
       }
     }
 
-    public void flush() throws IOException {
-
-      // Start the progress thread
-      Thread progress = createProgressThread(umbilical);
-
-      try {
-        //check whether the length of the key/value buffer is 0. If not, then
-        //we need to spill that to disk. Note that we reset the key/val buffer
-        //upon each spill (so a length > 0 means that we have not spilled yet)
-        if (keyValBuffer.getLength() > 0) {
-          sortAndSpillToDisk();
-        }
-        mergeParts();
-      } finally {
-        // Stop the progress thread
-        progress.interrupt();
+    public void flush() throws IOException 
+    {
+      //check whether the length of the key/value buffer is 0. If not, then
+      //we need to spill that to disk. Note that we reset the key/val buffer
+      //upon each spill (so a length > 0 means that we have not spilled yet)
+      if (keyValBuffer.getLength() > 0) {
+        sortAndSpillToDisk();
       }
+      mergeParts();
     }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 20 14:00:25
2007
@@ -78,7 +78,6 @@
   
   private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
-  AtomicBoolean sortComplete = new AtomicBoolean(false);
   private ReduceCopier reduceCopier;
 
   { 
@@ -169,12 +168,7 @@
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      // ignore the error, since failures in progress shouldn't kill us
-      try {
-        reporter.progress();
-      } catch (IOException ie) { 
-        LOG.debug("caught exception from progress", ie);
-      }
+      reporter.progress();
       return result;                              // return saved value
     }
 
@@ -232,11 +226,7 @@
     }
     public void informReduceProgress() {
       reducePhase.set(super.in.getProgress().get()); // update progress
-      try {
-        reporter.progress();
-      } catch (IOException ie) {
-        LOG.debug("Exception caught from progress", ie);
-      }
+      reporter.progress();
     }
     public Object next() {
       reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
@@ -249,8 +239,11 @@
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                                            job.getReducerClass(), job);
-    FileSystem lfs = FileSystem.getLocal(job);
 
+    // start thread that will handle communication with parent
+    startCommunicationThread(umbilical);
+
+    FileSystem lfs = FileSystem.getLocal(job);
     if (!job.get("mapred.job.tracker", "local").equals("local")) {
       reduceCopier = new ReduceCopier(umbilical, job);
       if (!reduceCopier.fetchOutputs()) {
@@ -280,52 +273,26 @@
     Path[] mapFiles = new Path[mapFilesList.size()];
     mapFiles = mapFilesList.toArray(mapFiles);
     
-    // spawn a thread to give sort progress heartbeats
-    Thread sortProgress = new Thread() {
-        public void run() {
-          while (!sortComplete.get()) {
-            try {
-              reportProgress(umbilical);
-              Thread.sleep(PROGRESS_INTERVAL);
-            } catch (InterruptedException e) {
-              return;
-            } catch (Throwable e) {
-              System.out.println("Thread Exception in " +
-                                 "reporting sort progress\n" +
-                                 StringUtils.stringifyException(e));
-              continue;
-            }
-          }
-        }
-      };
-    sortProgress.setDaemon(true);
-    sortProgress.setName("Sort progress reporter for task "+getTaskId());
-
     Path tempDir = new Path(getTaskId()); 
 
     WritableComparator comparator = job.getOutputValueGroupingComparator();
     
     SequenceFile.Sorter.RawKeyValueIterator rIter;
  
-    try {
-      setPhase(TaskStatus.Phase.SORT); 
-      sortProgress.start();
-
-      // sort the input file
-      SequenceFile.Sorter sorter =
-        new SequenceFile.Sorter(lfs, comparator, valueClass, job);
-      rIter = sorter.merge(mapFiles, tempDir, 
-                           !conf.getKeepFailedTaskFiles()); // sort
+    setPhase(TaskStatus.Phase.SORT); 
 
-    } finally {
-      sortComplete.set(true);
-    }
+    final Reporter reporter = getReporter(umbilical);
+    
+    // sort the input file
+    SequenceFile.Sorter sorter =
+      new SequenceFile.Sorter(lfs, comparator, valueClass, job);
+    sorter.setProgressable(reporter);
+    rIter = sorter.merge(mapFiles, tempDir, 
+        !conf.getKeepFailedTaskFiles()); // sort
 
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
 
-    final Reporter reporter = getReporter(umbilical);
-    
     // make output collector
     String finalName = getOutputName(getPartition());
     FileSystem fs = FileSystem.get(job);
@@ -338,7 +305,8 @@
           throws IOException {
           out.write(key, value);
           reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
-          reportProgress(umbilical);
+          // indicate that progress update needs to be sent
+          reporter.progress();
         }
       };
     
@@ -532,31 +500,6 @@
                               "map_".length(), endIndex));
     }
     
-    private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
-      //spawn a thread to give copy progress heartbeats
-      Thread copyProgress = new Thread() {
-          public void run() {
-            LOG.debug("Started thread: " + getName());
-            while (true) {
-              try {
-                reportProgress(umbilical);
-                Thread.sleep(PROGRESS_INTERVAL);
-              } catch (InterruptedException e) {
-                return;
-              } catch (Throwable e) {
-                LOG.info("Thread Exception in " +
-                         "reporting copy progress\n" +
-                         StringUtils.stringifyException(e));
-                continue;
-              }
-            }
-          }
-        };
-      copyProgress.setName("Copy progress reporter for task "+getTaskId());
-      copyProgress.setDaemon(true);
-      return copyProgress;
-    }
-    
     private int nextMapOutputCopierId = 0;
     
     /** Copies map outputs as they become available */
@@ -564,10 +507,12 @@
       
       private MapOutputLocation currentLocation = null;
       private int id = nextMapOutputCopierId++;
+      private Reporter reporter;
       
-      public MapOutputCopier() {
+      public MapOutputCopier(Reporter reporter) {
         setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
         LOG.debug(getName() + " created");
+        this.reporter = reporter;
       }
       
       /**
@@ -665,7 +610,7 @@
         tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
                                   tmpFilename, lDirAlloc, 
                                   conf, reduceTask.getPartition(), 
-                                  STALLED_COPY_TIMEOUT);
+                                  STALLED_COPY_TIMEOUT, reporter);
         if (!neededOutputs.contains(loc.getMapId())) {
           if (tmpFilename != null) {
             FileSystem fs = tmpFilename.getFileSystem(conf);
@@ -788,6 +733,7 @@
       sorter =
         new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
                                 conf.getMapOutputValueClass(), conf);
+      sorter.setProgressable(getReporter(umbilical));
       
       // hosts -> next contact time
       this.penaltyBox = new Hashtable<String, Long>();
@@ -832,9 +778,10 @@
       
       copiers = new MapOutputCopier[numCopiers];
       
+      Reporter reporter = getReporter(umbilical);
       // start all the copying threads
       for (int i=0; i < copiers.length; i++) {
-        copiers[i] = new MapOutputCopier();
+        copiers[i] = new MapOutputCopier(reporter);
         copiers[i].start();
       }
       
@@ -843,8 +790,6 @@
       long currentTime = startTime;
       IntWritable fromEventId = new IntWritable(0);
       
-      Thread copyProgress = createProgressThread(umbilical);
-      copyProgress.start();
       try {
         // loop until we get all required outputs
         while (!neededOutputs.isEmpty() && mergeThrowable == null) {
@@ -1077,7 +1022,6 @@
         return mergeThrowable == null && neededOutputs.isEmpty();
       } finally {
         inMemFileSys.close();
-        copyProgress.interrupt();
       }
     }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Jun 20 14:00:25
2007
@@ -31,7 +31,7 @@
   public static final Reporter NULL = new Reporter() {
       public void setStatus(String s) {
       }
-      public void progress() throws IOException {
+      public void progress() {
       }
       public void incrCounter(Enum key, long amount) {
       }
@@ -46,7 +46,7 @@
    * @param status
    *          a brief description of the current status
    */
-  public abstract void setStatus(String status) throws IOException;
+  public abstract void setStatus(String status);
   
   /**
    * Increments the counter identified by the key, which can be of

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 20 14:00:25 2007
@@ -22,6 +22,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.text.NumberFormat;
 
 import org.apache.commons.logging.Log;
@@ -36,6 +37,7 @@
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /** Base class for tasks. */
@@ -129,16 +131,17 @@
   }
   /**
    * Return current phase of the task. 
+   * needs to be synchronized as communication thread sends the phase every second
    * @return
    */
-  public TaskStatus.Phase getPhase(){
+  public synchronized TaskStatus.Phase getPhase(){
     return this.phase; 
   }
   /**
    * Set current phase of the task. 
    * @param p
    */
-  protected void setPhase(TaskStatus.Phase p){
+  protected synchronized void setPhase(TaskStatus.Phase p){
     this.phase = p; 
   }
 
@@ -217,12 +220,24 @@
   public static final int PROGRESS_INTERVAL = 1000;
 
   private transient Progress taskProgress = new Progress();
-  private transient long nextProgressTime =
-    System.currentTimeMillis() + PROGRESS_INTERVAL;
 
   // Current counters
   private transient Counters counters = new Counters();
   
+  /**
+   * flag that indicates whether progress update needs to be sent to parent.
+   * If true, it has been set. If false, it has been reset. 
+   * Using AtomicBoolean since we need an atomic read & reset method. 
+   */  
+  private AtomicBoolean progressFlag = new AtomicBoolean(false);
+  // getters and setters for flag
+  private void setProgressFlag() {
+    progressFlag.set(true);
+  }
+  private boolean resetProgressFlag() {
+    return progressFlag.getAndSet(false);
+  }
+  
   public abstract boolean isMapTask();
 
   public Progress getProgress() { return taskProgress; }
@@ -231,18 +246,76 @@
     throw new UnsupportedOperationException("Input only available on map");
   }
 
+  /** 
+   * The communication thread handles communication with the parent (Task Tracker). 
+   * It sends progress updates if progress has been made or if the task needs to 
+   * let the parent know that it's alive. It also pings the parent to see if it's alive.

+   */
+  protected void startCommunicationThread(final TaskUmbilicalProtocol umbilical) {
+    Thread thread = new Thread(new Runnable() {
+        public void run() {
+          final int MAX_RETRIES = 3;
+          int remainingRetries = MAX_RETRIES;
+          while (true) {
+            try {
+              // get current flag value and reset it as well
+              boolean sendProgress = resetProgressFlag();
+              boolean taskFound = true; // whether TT knows about this task
+              
+              if (sendProgress) {
+                // we need to send progress update
+                taskFound = umbilical.progress(taskId, taskProgress.get(), 
+                    taskProgress.toString(), getPhase(), counters);
+              }
+              else {
+                // send ping 
+                taskFound = umbilical.ping(taskId);
+              }
+              
+              // if Task Tracker is not aware of our task ID (probably because it died and

+              // came back up), kill ourselves
+              if (!taskFound) {
+                LOG.warn("Parent died.  Exiting "+taskId);
+                System.exit(66);
+              }
+              
+              remainingRetries = MAX_RETRIES;
+              // sleep for a bit
+              try {
+                Thread.sleep(PROGRESS_INTERVAL);
+              } 
+              catch (InterruptedException e) {
+              }
+            } 
+            catch (Throwable t) {
+              LOG.info("Communication exception: " + StringUtils.stringifyException(t));
+              remainingRetries -=1;
+              if (remainingRetries == 0) {
+                ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+                LOG.warn("Last retry, killing "+taskId);
+                System.exit(65);
+              }
+            }
+          }
+        }
+      }, "Comm thread for "+taskId);
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+  
   protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
     throws IOException 
   {
     return new Reporter() {
-        public void setStatus(String status) throws IOException {
-          synchronized (this) {
-            taskProgress.setStatus(status);
-            progress();
-          }
+        public void setStatus(String status) {
+          taskProgress.setStatus(status);
+          // indicate that progress update needs to be sent
+          setProgressFlag();
         }
-        public void progress() throws IOException {
-          reportProgress(umbilical);
+        public void progress() {
+          // indicate that progress update needs to be sent
+          setProgressFlag();
         }
         public void incrCounter(Enum key, long amount) {
           Counters counters = getCounters();
@@ -258,24 +331,8 @@
 
   public void setProgress(float progress) {
     taskProgress.set(progress);
-  }
-
-  public void reportProgress(TaskUmbilicalProtocol umbilical) {
-    long now = System.currentTimeMillis();
-    synchronized (this) {
-      if (now > nextProgressTime)  {
-        nextProgressTime = now + PROGRESS_INTERVAL;
-        float progress = taskProgress.get();
-        String status = taskProgress.toString();
-        try {
-          umbilical.progress(getTaskId(), progress, status, phase, counters);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();     // interrupt ourself
-        } catch (IOException ie) {
-          LOG.warn(StringUtils.stringifyException(ie));
-        }
-      }
-    }
+    // indicate that progress update needs to be sent
+    setProgressFlag();
   }
 
   public void done(TaskUmbilicalProtocol umbilical) throws IOException {
@@ -286,14 +343,17 @@
         if (needProgress) {
           // send a final status report
           try {
-            umbilical.progress(getTaskId(), taskProgress.get(), 
-                               taskProgress.toString(), phase, counters);
+            if (!umbilical.progress(taskId, taskProgress.get(),
+                taskProgress.toString(), getPhase(), counters)) {
+              LOG.warn("Parent died.  Exiting "+taskId);
+              System.exit(66);
+            }
             needProgress = false;
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();       // interrupt ourself
           }
         }
-        umbilical.done(getTaskId());
+        umbilical.done(taskId);
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 20 14:00:25
2007
@@ -1530,7 +1530,7 @@
   /**
    * Called periodically to report Task progress, from 0.0 to 1.0.
    */
-  public synchronized void progress(String taskid, float progress, 
+  public synchronized boolean progress(String taskid, float progress, 
                                     String state, 
                                     TaskStatus.Phase phase,
                                     Counters counters
@@ -1538,8 +1538,10 @@
     TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
     if (tip != null) {
       tip.reportProgress(progress, state, phase, counters);
+      return true;
     } else {
-      LOG.warn("Progress from unknown child task: "+taskid+". Ignored.");
+      LOG.warn("Progress from unknown child task: "+taskid);
+      return false;
     }
   }
 
@@ -1697,8 +1699,6 @@
           
       defaultConf.addFinalResource(new Path(task.getJobFile()));
 
-      startPinging(umbilical, taskid);        // start pinging parent
-
       try {
         // use job-specified working directory
         FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
@@ -1720,41 +1720,6 @@
         // there is no more logging done.
         LogManager.shutdown();
       }
-    }
-
-    /** Periodically ping parent and exit when this fails.*/
-    private static void startPinging(final TaskUmbilicalProtocol umbilical,
-                                     final String taskid) {
-      Thread thread = new Thread(new Runnable() {
-          public void run() {
-            final int MAX_RETRIES = 3;
-            int remainingRetries = MAX_RETRIES;
-            while (true) {
-              try {
-                if (!umbilical.ping(taskid)) {
-                  LOG.warn("Parent died.  Exiting "+taskid);
-                  System.exit(66);
-                }
-                remainingRetries = MAX_RETRIES;
-              } catch (Throwable t) {
-                String msg = StringUtils.stringifyException(t);
-                LOG.info("Ping exception: " + msg);
-                remainingRetries -=1;
-                if (remainingRetries == 0) {
-                  ReflectionUtils.logThreadInfo(LOG, "ping exception", 0);
-                  LOG.warn("Last retry, killing "+taskid);
-                  System.exit(65);
-                }
-              }
-              try {
-                Thread.sleep(1000);
-              } catch (InterruptedException e) {
-              }
-            }
-          }
-        }, "Pinger for "+taskid);
-      thread.setDaemon(true);
-      thread.start();
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Jun
20 14:00:25 2007
@@ -29,8 +29,10 @@
  * and parent is via this protocol. */ 
 interface TaskUmbilicalProtocol extends VersionedProtocol {
 
-  /** Changed the version to 2, since we have a new method getMapOutputs */
-  public static final long versionID = 2L;
+  /** Changed the version to 2, since we have a new method getMapOutputs 
+   * Changed version to 3 to have progress() return a boolean
+   * */
+  public static final long versionID = 3L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;
@@ -41,8 +43,9 @@
    * @param state description of task's current state
    * @param phase current phase of the task.
    * @param counters the counters for this task.
+   * @return True if the task is known
    */
-  void progress(String taskid, float progress, String state, 
+  boolean progress(String taskid, float progress, String state, 
                 TaskStatus.Phase phase, Counters counters)
     throws IOException, InterruptedException;
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java Wed Jun
20 14:00:25 2007
@@ -65,7 +65,7 @@
   /**
    * Update the status message for the task.
    */
-  public void status(String msg) throws IOException {
+  public void status(String msg) {
     reporter.setStatus(msg);
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java Wed Jun 20 14:00:25
2007
@@ -44,7 +44,7 @@
   }
 
   /** Adds a node to the tree. */
-  public Progress addPhase() {
+  public synchronized Progress addPhase() {
     Progress phase = new Progress();
     phases.add(phase);
     phase.parent = this;
@@ -54,17 +54,17 @@
 
   /** Called during execution to move to the next phase at this level in the
    * tree. */
-  public void startNextPhase() {
+  public synchronized void startNextPhase() {
     currentPhase++;
   }
 
   /** Returns the current sub-node executing. */
-  public Progress phase() {
+  public synchronized Progress phase() {
     return phases.get(currentPhase);
   }
 
   /** Completes this node, moving the parent node to its next child. */
-  public void complete() {
+  public synchronized void complete() {
     progress = 1.0f;
     if (parent != null) {
       parent.startNextPhase();
@@ -72,12 +72,14 @@
   }
 
   /** Called during execution on a leaf node to set its progress. */
-  public void set(float progress) {
+  public synchronized void set(float progress) {
     this.progress = progress;
   }
 
   /** Returns the overall progress of the root. */
-  public float get() {
+  // this method probably does not need to be synchronized as getINternal() is synchronized

+  // and the node's parent never changes. Still, it doesn't hurt. 
+  public synchronized float get() {
     Progress node = this;
     while (node.parent != null) {                 // find the root
       node = parent;
@@ -86,7 +88,7 @@
   }
 
   /** Computes progress in this node. */
-  private float getInternal() {
+  private synchronized float getInternal() {
     int phaseCount = phases.size();
     if (phaseCount != 0) {
       float subProgress =
@@ -97,7 +99,7 @@
     }
   }
 
-  public void setStatus(String status) {
+  public synchronized void setStatus(String status) {
     this.status = status;
   }
 
@@ -107,7 +109,7 @@
     return result.toString();
   }
 
-  private void toString(StringBuffer buffer) {
+  private synchronized void toString(StringBuffer buffer) {
     buffer.append(status);
     if (phases.size() != 0 && currentPhase < phases.size()) {
       buffer.append(" > ");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java?view=diff&rev=549233&r1=549232&r2=549233
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java Wed Jun 20 14:00:25
2007
@@ -12,5 +12,5 @@
   /** callback for reporting progress. Used by DFSclient to report
    * progress while writing a block of DFS file.
    */
-  public void progress() throws IOException;
+  public void progress();
 }



Mime
View raw message