Author: cutting
Date: Tue Nov 14 11:09:16 2006
New Revision: 474922
URL: http://svn.apache.org/viewvc?view=rev&rev=474922
Log:
HADOOP-613. Perform final merge while reducing. Contributed by Devaraj.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=474922&r1=474921&r2=474922
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 14 11:09:16 2006
@@ -56,6 +56,10 @@
17. HADOOP-705. Fix a bug in the JobTracker when failed jobs were
not completely cleaned up. (Mahadev Konar via cutting)
+18. HADOOP-613. Perform final merge while reducing. This removes one
+ sort pass over the data and should consequently significantly
+ decrease overall processing time. (Devaraj Das via cutting)
+
Release 0.8.0 - 2006-11-03
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=474922&r1=474921&r2=474922
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Nov 14 11:09:16
2006
@@ -33,6 +33,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
/** Support for flat files of binary key/value pairs. */
@@ -1538,6 +1539,34 @@
}
}
+ /**
+ * Perform a file sort from a set of input files and return an iterator.
+ * @param inFiles the files to be sorted
+ * @param tempDir the directory where temp files are created during sort
+ * @param deleteInput should the input files be deleted as they are read?
+ * @return iterator the RawKeyValueIterator
+ */
+ public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir,
+ boolean deleteInput) throws IOException {
+ Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
+ if (fs.exists(outFile)) {
+ throw new IOException("already exists: " + outFile);
+ }
+ this.inFiles = inFiles;
+ //outFile will basically be used as prefix for temp files in the cases
+ //where sort outputs multiple sorted segments. For the single segment
+ //case, the outputFile itself will contain the sorted data for that
+ //segment
+ this.outFile = outFile;
+
+ int segments = sortPass(deleteInput);
+ if (segments > 1)
+ return merge(outFile.suffix(".0"), outFile.suffix(".0.index"));
+ else if (segments == 1)
+ return merge(new Path[]{outFile}, true);
+ else return null;
+ }
+
/**
* The backwards compatible interface to sort.
* @param inFile the input file to sort
@@ -1799,6 +1828,10 @@
* @throws IOException
*/
void close() throws IOException;
+ /** Gets the Progress object; this has a float (0.0 - 1.0)
+ * indicating the bytes processed by the iterator so far
+ */
+ Progress getProgress();
}
/**
@@ -1941,6 +1974,9 @@
private boolean blockCompress;
private DataOutputBuffer rawKey = new DataOutputBuffer();
private ValueBytes rawValue;
+ private long totalBytesProcessed;
+ private float progPerByte;
+ private Progress mergeProgress = new Progress();
//a TreeMap used to store the segments sorted by size (segment offset and
//segment path name is used to break ties between segments of same sizes)
@@ -1992,7 +2028,7 @@
//load the raw value. Re-use the existing rawValue buffer
if(rawValue == null)
rawValue = ms.in.createValueBytes();
- ms.nextRawValue(rawValue);
+ int valLength = ms.nextRawValue(rawValue);
if (ms.nextRawKey()) {
adjustTop();
@@ -2000,9 +2036,17 @@
pop();
ms.cleanup();
}
+ if (progPerByte > 0) {
+ totalBytesProcessed += rawKey.getLength() + valLength;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ }
return true;
}
+ public Progress getProgress() {
+ return mergeProgress;
+ }
+
/** This is the single level merge that is called multiple times
* depending on the factor size and the number of segments
* @return RawKeyValueIterator
@@ -2029,6 +2073,13 @@
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
if (numSegments <= factor) {
+ //calculate the length of the remaining segments. Required for
+ //calculating the merge progress
+ long totalBytes = 0;
+ for (int i = 0; i < numSegments; i++)
+ totalBytes += mStream[i].segmentLength;
+ if (totalBytes != 0) //being paranoid
+ progPerByte = 1.0f / (float)totalBytes;
return this;
} else {
//we want to spread the creation of temp files on multiple disks if
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=474922&r1=474921&r2=474922
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 14 11:09:16
2006
@@ -122,23 +122,28 @@
/** Iterates values while keys match in sorted input. */
private class ValuesIterator implements Iterator {
- private SequenceFile.Reader in; // input file
+ private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
private WritableComparable key; // current key
private Writable value; // current value
private boolean hasNext; // more w/ this key
private boolean more; // more in file
- private float progPerByte;
private TaskUmbilicalProtocol umbilical;
private WritableComparator comparator;
-
- public ValuesIterator (SequenceFile.Reader in, long length,
- WritableComparator comparator,
- TaskUmbilicalProtocol umbilical)
+ private Class keyClass;
+ private Class valClass;
+ private DataOutputBuffer valOut = new DataOutputBuffer();
+ private DataInputBuffer valIn = new DataInputBuffer();
+ private DataInputBuffer keyIn = new DataInputBuffer();
+
+ public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
+ WritableComparator comparator, Class keyClass,
+ Class valClass, TaskUmbilicalProtocol umbilical)
throws IOException {
this.in = in;
- this.progPerByte = 1.0f / (float)length;
this.umbilical = umbilical;
this.comparator = comparator;
+ this.keyClass = keyClass;
+ this.valClass = valClass;
getNext();
}
@@ -173,18 +178,26 @@
public WritableComparable getKey() { return key; }
private void getNext() throws IOException {
- reducePhase.set(in.getPosition()*progPerByte); // update progress
+ reducePhase.set(in.getProgress().get()); // update progress
reportProgress(umbilical);
Writable lastKey = key; // save previous key
try {
- key = (WritableComparable)in.getKeyClass().newInstance();
- value = (Writable)in.getValueClass().newInstance();
+ key = (WritableComparable)keyClass.newInstance();
+ value = (Writable)valClass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
- more = in.next(key, value);
+ more = in.next();
if (more) {
+ //de-serialize the raw key/value
+ keyIn.reset(in.getKey().getData(), in.getKey().getLength());
+ key.readFields(keyIn);
+ valOut.reset();
+ (in.getValue()).writeUncompressedBytes(valOut);
+ valIn.reset(valOut.getData(), valOut.getLength());
+ value.readFields(valIn);
+
if (lastKey == null) {
hasNext = true;
} else {
@@ -233,10 +246,12 @@
};
sortProgress.setName("Sort progress reporter for task "+getTaskId());
- Path sortedFile = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.2");
+ Path tempDir = job.getLocalPath(getTaskId());
WritableComparator comparator = job.getOutputKeyComparator();
-
+
+ SequenceFile.Sorter.RawKeyValueIterator rIter;
+
try {
setPhase(TaskStatus.Phase.SORT) ;
sortProgress.start();
@@ -244,7 +259,8 @@
// sort the input file
SequenceFile.Sorter sorter =
new SequenceFile.Sorter(lfs, comparator, valueClass, job);
- sorter.sort(mapFiles, sortedFile, !conf.getKeepFailedTaskFiles()); // sort
+ rIter = sorter.sortAndIterate(mapFiles, tempDir,
+ !conf.getKeepFailedTaskFiles()); // sort
} finally {
sortComplete = true;
@@ -269,11 +285,11 @@
};
// apply reduce function
- SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
- long length = lfs.getLength(sortedFile);
try {
- ValuesIterator values = new ValuesIterator(in, length, comparator,
- umbilical);
+ Class keyClass = job.getMapOutputKeyClass();
+ Class valClass = job.getMapOutputValueClass();
+ ValuesIterator values = new ValuesIterator(rIter, comparator, keyClass,
+ valClass, umbilical);
while (values.more()) {
myMetrics.reduceInput();
reducer.reduce(values.getKey(), values, collector, reporter);
@@ -282,8 +298,6 @@
} finally {
reducer.close();
- in.close();
- lfs.delete(sortedFile); // remove sorted
out.close(reporter);
}
done(umbilical);
|