Author: tomwhite
Date: Thu Apr 5 01:45:28 2007
New Revision: 525761
URL: http://svn.apache.org/viewvc?view=rev&rev=525761
Log:
Merge -r 525456:525457 from trunk to 0.12 branch.
Modified:
lucene/hadoop/branches/branch-0.12/CHANGES.txt
lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: lucene/hadoop/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.12/CHANGES.txt?view=diff&rev=525761&r1=525760&r2=525761
==============================================================================
--- lucene/hadoop/branches/branch-0.12/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.12/CHANGES.txt Thu Apr 5 01:45:28 2007
@@ -23,6 +23,9 @@
command and a performance problem in HDFS's implementation of it.
(Hairong Kuang via cutting)
+ 7. HADOOP-1105. Fix reducers to make "progress" while iterating
+ through values. (Devaraj Das & Owen O'Malley via tomwhite)
+
Release 0.12.2 - 2007-23-17
Modified: lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=525761&r1=525760&r2=525761
==============================================================================
--- lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/MapTask.java Thu
Apr 5 01:45:28 2007
@@ -393,7 +393,7 @@
Reducer combiner, OutputCollector combineCollector) throws IOException {
//combine the key/value obtained from the offset & indices arrays.
CombineValuesIterator values = new CombineValuesIterator(resultIter,
- comparator, keyClass, valClass, umbilical, job);
+ comparator, keyClass, valClass, job, reporter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector, reporter);
values.nextKey();
@@ -523,10 +523,9 @@
public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
WritableComparator comparator, Class keyClass,
- Class valClass, TaskUmbilicalProtocol umbilical,
- Configuration conf)
+ Class valClass, Configuration conf, Reporter reporter)
throws IOException {
- super(in, comparator, keyClass, valClass, umbilical, conf);
+ super(in, comparator, keyClass, valClass, conf, reporter);
}
}
}
Modified: lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=525761&r1=525760&r2=525761
==============================================================================
--- lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.12/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu
Apr 5 01:45:28 2007
@@ -26,6 +26,8 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -57,6 +59,7 @@
});
}
+ private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
private int numMaps;
private boolean sortComplete;
@@ -116,7 +119,6 @@
private Writable value; // current value
private boolean hasNext; // more w/ this key
private boolean more; // more in file
- private TaskUmbilicalProtocol umbilical;
private WritableComparator comparator;
private Class keyClass;
private Class valClass;
@@ -124,18 +126,19 @@
private DataOutputBuffer valOut = new DataOutputBuffer();
private DataInputBuffer valIn = new DataInputBuffer();
private DataInputBuffer keyIn = new DataInputBuffer();
+ protected Reporter reporter;
public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
WritableComparator comparator, Class keyClass,
- Class valClass, TaskUmbilicalProtocol umbilical,
- Configuration conf)
+ Class valClass, Configuration conf,
+ Reporter reporter)
throws IOException {
this.in = in;
- this.umbilical = umbilical;
this.conf = conf;
this.comparator = comparator;
this.keyClass = keyClass;
this.valClass = valClass;
+ this.reporter = reporter;
getNext();
}
@@ -144,13 +147,19 @@
public boolean hasNext() { return hasNext; }
public Object next() {
+ Object result = value; // save value
try {
- Object result = value; // save value
getNext(); // move to next
- return result; // return saved value
} catch (IOException e) {
throw new RuntimeException(e);
}
+ // ignore the error, since failures in progress shouldn't kill us
+ try {
+ reporter.progress();
+ } catch (IOException ie) {
+ LOG.debug("caught exception from progress", ie);
+ }
+ return result; // return saved value
}
public void remove() { throw new RuntimeException("not implemented"); }
@@ -200,14 +209,18 @@
private class ReduceValuesIterator extends ValuesIterator {
public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
WritableComparator comparator, Class keyClass,
- Class valClass, TaskUmbilicalProtocol umbilical,
- Configuration conf)
+ Class valClass,
+ Configuration conf, Reporter reporter)
throws IOException {
- super(in, comparator, keyClass, valClass, umbilical, conf);
+ super(in, comparator, keyClass, valClass, conf, reporter);
}
public void informReduceProgress() {
reducePhase.set(super.in.getProgress().get()); // update progress
- reportProgress(super.umbilical);
+ try {
+ reporter.progress();
+ } catch (IOException ie) {
+ LOG.debug("Exception caught from progress", ie);
+ }
}
}
@@ -306,7 +319,7 @@
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator,
- keyClass, valClass, umbilical, job);
+ keyClass, valClass, job, reporter);
values.informReduceProgress();
while (values.more()) {
reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
|