hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r525761 - in /lucene/hadoop/branches/branch-0.12: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java
Date Thu, 05 Apr 2007 08:45:29 GMT
Author: tomwhite
Date: Thu Apr  5 01:45:28 2007
New Revision: 525761

URL: http://svn.apache.org/viewvc?view=rev&rev=525761
Log:
Merge -r 525456:525457 from trunk to 0.12 branch.

Modified:
    lucene/hadoop/branches/branch-0.12/CHANGES.txt
    lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.12/CHANGES.txt?view=diff&rev=525761&r1=525760&r2=525761
==============================================================================
--- lucene/hadoop/branches/branch-0.12/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.12/CHANGES.txt Thu Apr  5 01:45:28 2007
@@ -23,6 +23,9 @@
     command and a performance problem in HDFS's implementation of it.
     (Hairong Kuang via cutting)
 
+ 7. HADOOP-1105. Fix reducers to make "progress" while iterating 
+    through values.  (Devaraj Das & Owen O'Malley via tomwhite)
+
 
 Release 0.12.2 - 2007-23-17
 

Modified: lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=525761&r1=525760&r2=525761
==============================================================================
--- lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java Thu
Apr  5 01:45:28 2007
@@ -393,7 +393,7 @@
     Reducer combiner, OutputCollector combineCollector) throws IOException {
       //combine the key/value obtained from the offset & indices arrays.
       CombineValuesIterator values = new CombineValuesIterator(resultIter,
-              comparator, keyClass, valClass, umbilical, job);
+              comparator, keyClass, valClass, job, reporter);
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
@@ -523,10 +523,9 @@
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
               WritableComparator comparator, Class keyClass,
-              Class valClass, TaskUmbilicalProtocol umbilical, 
-              Configuration conf) 
+              Class valClass, Configuration conf, Reporter reporter) 
       throws IOException {
-        super(in, comparator, keyClass, valClass, umbilical, conf);
+        super(in, comparator, keyClass, valClass, conf, reporter);
       }
     }
   }

Modified: lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=525761&r1=525760&r2=525761
==============================================================================
--- lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu
Apr  5 01:45:28 2007
@@ -26,6 +26,8 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +59,7 @@
        });
   }
   
+  private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
   private boolean sortComplete;
 
@@ -116,7 +119,6 @@
     private Writable value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
-    private TaskUmbilicalProtocol umbilical;
     private WritableComparator comparator;
     private Class keyClass;
     private Class valClass;
@@ -124,18 +126,19 @@
     private DataOutputBuffer valOut = new DataOutputBuffer();
     private DataInputBuffer valIn = new DataInputBuffer();
     private DataInputBuffer keyIn = new DataInputBuffer();
+    protected Reporter reporter;
 
     public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
                            WritableComparator comparator, Class keyClass,
-                           Class valClass, TaskUmbilicalProtocol umbilical,
-                           Configuration conf)
+                           Class valClass, Configuration conf, 
+                           Reporter reporter)
       throws IOException {
       this.in = in;
-      this.umbilical = umbilical;
       this.conf = conf;
       this.comparator = comparator;
       this.keyClass = keyClass;
       this.valClass = valClass;
+      this.reporter = reporter;
       getNext();
     }
 
@@ -144,13 +147,19 @@
     public boolean hasNext() { return hasNext; }
 
     public Object next() {
+      Object result = value;                      // save value
       try {
-        Object result = value;                      // save value
         getNext();                                  // move to next
-        return result;                              // return saved value
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
+      // ignore the error, since failures in progress shouldn't kill us
+      try {
+        reporter.progress();
+      } catch (IOException ie) { 
+        LOG.debug("caught exception from progress", ie);
+      }
+      return result;                              // return saved value
     }
 
     public void remove() { throw new RuntimeException("not implemented"); }
@@ -200,14 +209,18 @@
   private class ReduceValuesIterator extends ValuesIterator {
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
                                WritableComparator comparator, Class keyClass,
-                               Class valClass, TaskUmbilicalProtocol umbilical,
-                               Configuration conf)
+                               Class valClass,
+                               Configuration conf, Reporter reporter)
     throws IOException {
-      super(in, comparator, keyClass, valClass, umbilical, conf);
+      super(in, comparator, keyClass, valClass, conf, reporter);
     }
     public void informReduceProgress() {
       reducePhase.set(super.in.getProgress().get()); // update progress
-      reportProgress(super.umbilical);
+      try {
+        reporter.progress();
+      } catch (IOException ie) {
+        LOG.debug("Exception caught from progress", ie);
+      }
     }
   }
 
@@ -306,7 +319,7 @@
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
       ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
-                                  keyClass, valClass, umbilical, job);
+                                  keyClass, valClass, job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);



Mime
View raw message