Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 5123 invoked from network); 14 Nov 2006 19:09:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Nov 2006 19:09:49 -0000 Received: (qmail 46710 invoked by uid 500); 14 Nov 2006 19:09:59 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 46684 invoked by uid 500); 14 Nov 2006 19:09:59 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 46675 invoked by uid 99); 14 Nov 2006 19:09:59 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2006 11:09:59 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2006 11:09:47 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 6D9E81A9846; Tue, 14 Nov 2006 11:09:17 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r474922 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/java/org/apache/hadoop/mapred/ReduceTask.java Date: Tue, 14 Nov 2006 19:09:17 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061114190917.6D9E81A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Tue Nov 14 11:09:16 2006 New Revision: 474922 URL: http://svn.apache.org/viewvc?view=rev&rev=474922 Log: HADOOP-613. Perform final merge while reducing. Contributed by Devaraj. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=474922&r1=474921&r2=474922 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 14 11:09:16 2006 @@ -56,6 +56,10 @@ 17. HADOOP-705. Fix a bug in the JobTracker when failed jobs were not completely cleaned up. (Mahadev Konar via cutting) +18. HADOOP-613. Perform final merge while reducing. This removes one + sort pass over the data and should consequently significantly + decrease overall processing time. (Devaraj Das via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=474922&r1=474921&r2=474922 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Nov 14 11:09:16 2006 @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; /** Support for flat files of binary key/value pairs. */ @@ -1538,6 +1539,34 @@ } } + /** + * Perform a file sort from a set of input files and return an iterator. + * @param inFiles the files to be sorted + * @param tempDir the directory where temp files are created during sort + * @param deleteInput should the input files be deleted as they are read? + * @return iterator the RawKeyValueIterator + */ + public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, + boolean deleteInput) throws IOException { + Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); + if (fs.exists(outFile)) { + throw new IOException("already exists: " + outFile); + } + this.inFiles = inFiles; + //outFile will basically be used as prefix for temp files in the cases + //where sort outputs multiple sorted segments. For the single segment + //case, the outputFile itself will contain the sorted data for that + //segment + this.outFile = outFile; + + int segments = sortPass(deleteInput); + if (segments > 1) + return merge(outFile.suffix(".0"), outFile.suffix(".0.index")); + else if (segments == 1) + return merge(new Path[]{outFile}, true); + else return null; + } + /** * The backwards compatible interface to sort. * @param inFile the input file to sort @@ -1799,6 +1828,10 @@ * @throws IOException */ void close() throws IOException; + /** Gets the Progress object; this has a float (0.0 - 1.0) + * indicating the bytes processed by the iterator so far + */ + Progress getProgress(); } /** @@ -1941,6 +1974,9 @@ private boolean blockCompress; private DataOutputBuffer rawKey = new DataOutputBuffer(); private ValueBytes rawValue; + private long totalBytesProcessed; + private float progPerByte; + private Progress mergeProgress = new Progress(); //a TreeMap used to store the segments sorted by size (segment offset and //segment path name is used to break ties between segments of same sizes) @@ -1992,7 +2028,7 @@ //load the raw value. Re-use the existing rawValue buffer if(rawValue == null) rawValue = ms.in.createValueBytes(); - ms.nextRawValue(rawValue); + int valLength = ms.nextRawValue(rawValue); if (ms.nextRawKey()) { adjustTop(); @@ -2000,9 +2036,17 @@ pop(); ms.cleanup(); } + if (progPerByte > 0) { + totalBytesProcessed += rawKey.getLength() + valLength; + mergeProgress.set(totalBytesProcessed * progPerByte); + } return true; } + public Progress getProgress() { + return mergeProgress; + } + /** This is the single level merge that is called multiple times * depending on the factor size and the number of segments * @return RawKeyValueIterator @@ -2029,6 +2073,13 @@ //if we have lesser number of segments remaining, then just return the //iterator, else do another single level merge if (numSegments <= factor) { + //calculate the length of the remaining segments. Required for + //calculating the merge progress + long totalBytes = 0; + for (int i = 0; i < numSegments; i++) + totalBytes += mStream[i].segmentLength; + if (totalBytes != 0) //being paranoid + progPerByte = 1.0f / (float)totalBytes; return this; } else { //we want to spread the creation of temp files on multiple disks if Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=474922&r1=474921&r2=474922 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 14 11:09:16 2006 @@ -122,23 +122,28 @@ /** Iterates values while keys match in sorted input. */ private class ValuesIterator implements Iterator { - private SequenceFile.Reader in; // input file + private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator private WritableComparable key; // current key private Writable value; // current value private boolean hasNext; // more w/ this key private boolean more; // more in file - private float progPerByte; private TaskUmbilicalProtocol umbilical; private WritableComparator comparator; - - public ValuesIterator (SequenceFile.Reader in, long length, - WritableComparator comparator, - TaskUmbilicalProtocol umbilical) + private Class keyClass; + private Class valClass; + private DataOutputBuffer valOut = new DataOutputBuffer(); + private DataInputBuffer valIn = new DataInputBuffer(); + private DataInputBuffer keyIn = new DataInputBuffer(); + + public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, + WritableComparator comparator, Class keyClass, + Class valClass, TaskUmbilicalProtocol umbilical) throws IOException { this.in = in; - this.progPerByte = 1.0f / (float)length; this.umbilical = umbilical; this.comparator = comparator; + this.keyClass = keyClass; + this.valClass = valClass; getNext(); } @@ -173,18 +178,26 @@ public WritableComparable getKey() { return key; } private void getNext() throws IOException { - reducePhase.set(in.getPosition()*progPerByte); // update progress + reducePhase.set(in.getProgress().get()); // update progress reportProgress(umbilical); Writable lastKey = key; // save previous key try { - key = (WritableComparable)in.getKeyClass().newInstance(); - value = (Writable)in.getValueClass().newInstance(); + key = (WritableComparable)keyClass.newInstance(); + value = (Writable)valClass.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } - more = in.next(key, value); + more = in.next(); if (more) { + //de-serialize the raw key/value + keyIn.reset(in.getKey().getData(), in.getKey().getLength()); + key.readFields(keyIn); + valOut.reset(); + (in.getValue()).writeUncompressedBytes(valOut); + valIn.reset(valOut.getData(), valOut.getLength()); + value.readFields(valIn); + if (lastKey == null) { hasNext = true; } else { @@ -233,10 +246,12 @@ }; sortProgress.setName("Sort progress reporter for task "+getTaskId()); - Path sortedFile = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.2"); + Path tempDir = job.getLocalPath(getTaskId()); WritableComparator comparator = job.getOutputKeyComparator(); - + + SequenceFile.Sorter.RawKeyValueIterator rIter; + try { setPhase(TaskStatus.Phase.SORT) ; sortProgress.start(); @@ -244,7 +259,8 @@ // sort the input file SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, comparator, valueClass, job); - sorter.sort(mapFiles, sortedFile, !conf.getKeepFailedTaskFiles()); // sort + rIter = sorter.sortAndIterate(mapFiles, tempDir, + !conf.getKeepFailedTaskFiles()); // sort } finally { sortComplete = true; @@ -269,11 +285,11 @@ }; // apply reduce function - SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job); - long length = lfs.getLength(sortedFile); try { - ValuesIterator values = new ValuesIterator(in, length, comparator, - umbilical); + Class keyClass = job.getMapOutputKeyClass(); + Class valClass = job.getMapOutputValueClass(); + ValuesIterator values = new ValuesIterator(rIter, comparator, keyClass, + valClass, umbilical); while (values.more()) { myMetrics.reduceInput(); reducer.reduce(values.getKey(), values, collector, reporter); @@ -282,8 +298,6 @@ } finally { reducer.close(); - in.close(); - lfs.delete(sortedFile); // remove sorted out.close(reporter); } done(umbilical);