hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r525457 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java
Date Wed, 04 Apr 2007 08:46:18 GMT
Author: tomwhite
Date: Wed Apr  4 01:46:17 2007
New Revision: 525457

URL: http://svn.apache.org/viewvc?view=rev&rev=525457
Log:
HADOOP-1105. Fix reducers to make "progress" while iterating through values.  Contributed
by Devaraj Das & Owen O'Malley.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=525457&r1=525456&r2=525457
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Apr  4 01:46:17 2007
@@ -120,6 +120,9 @@
     command and a performance problem in HDFS's implementation of it.
     (Hairong Kuang via cutting)
 
+ 7. HADOOP-1105. Fix reducers to make "progress" while iterating 
+    through values.  (Devaraj Das & Owen O'Malley via tomwhite)
+
 
 Release 0.12.2 - 2007-23-17
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=525457&r1=525456&r2=525457
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Apr  4 01:46:17
2007
@@ -395,7 +395,7 @@
     Reducer combiner, OutputCollector combineCollector) throws IOException {
       //combine the key/value obtained from the offset & indices arrays.
       CombineValuesIterator values = new CombineValuesIterator(resultIter,
-              comparator, keyClass, valClass, umbilical, job);
+              comparator, keyClass, valClass, job, reporter);
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
@@ -526,10 +526,9 @@
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
               WritableComparator comparator, Class keyClass,
-              Class valClass, TaskUmbilicalProtocol umbilical, 
-              Configuration conf) 
+              Class valClass, Configuration conf, Reporter reporter) 
       throws IOException {
-        super(in, comparator, keyClass, valClass, umbilical, conf);
+        super(in, comparator, keyClass, valClass, conf, reporter);
       }
       
       public Object next() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=525457&r1=525456&r2=525457
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Apr  4 01:46:17
2007
@@ -26,6 +26,8 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +59,7 @@
        });
   }
   
+  private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
   private boolean sortComplete;
 
@@ -116,7 +119,6 @@
     private Writable value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
-    private TaskUmbilicalProtocol umbilical;
     private WritableComparator comparator;
     private Class keyClass;
     private Class valClass;
@@ -124,18 +126,19 @@
     private DataOutputBuffer valOut = new DataOutputBuffer();
     private DataInputBuffer valIn = new DataInputBuffer();
     private DataInputBuffer keyIn = new DataInputBuffer();
+    protected Reporter reporter;
 
     public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
                            WritableComparator comparator, Class keyClass,
-                           Class valClass, TaskUmbilicalProtocol umbilical,
-                           Configuration conf)
+                           Class valClass, Configuration conf, 
+                           Reporter reporter)
       throws IOException {
       this.in = in;
-      this.umbilical = umbilical;
       this.conf = conf;
       this.comparator = comparator;
       this.keyClass = keyClass;
       this.valClass = valClass;
+      this.reporter = reporter;
       getNext();
     }
 
@@ -144,13 +147,19 @@
     public boolean hasNext() { return hasNext; }
 
     public Object next() {
+      Object result = value;                      // save value
       try {
-        Object result = value;                      // save value
         getNext();                                  // move to next
-        return result;                              // return saved value
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
+      // ignore the error, since failures in progress shouldn't kill us
+      try {
+        reporter.progress();
+      } catch (IOException ie) { 
+        LOG.debug("caught exception from progress", ie);
+      }
+      return result;                              // return saved value
     }
 
     public void remove() { throw new RuntimeException("not implemented"); }
@@ -198,18 +207,20 @@
     }
   }
   private class ReduceValuesIterator extends ValuesIterator {
-    private Reporter reporter;
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
                                WritableComparator comparator, Class keyClass,
-                               Class valClass, TaskUmbilicalProtocol umbilical,
+                               Class valClass,
                                Configuration conf, Reporter reporter)
     throws IOException {
-      super(in, comparator, keyClass, valClass, umbilical, conf);
-      this.reporter = reporter;
+      super(in, comparator, keyClass, valClass, conf, reporter);
     }
     public void informReduceProgress() {
       reducePhase.set(super.in.getProgress().get()); // update progress
-      reportProgress(super.umbilical);
+      try {
+        reporter.progress();
+      } catch (IOException ie) {
+        LOG.debug("Exception caught from progress", ie);
+      }
     }
     public Object next() {
       reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
@@ -312,7 +323,7 @@
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
       ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
-                                  keyClass, valClass, umbilical, job, reporter);
+                                  keyClass, valClass, job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);



Mime
View raw message