Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 86433 invoked from network); 22 Jun 2006 19:15:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 22 Jun 2006 19:15:21 -0000 Received: (qmail 682 invoked by uid 500); 22 Jun 2006 19:15:21 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 653 invoked by uid 500); 22 Jun 2006 19:15:20 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 644 invoked by uid 99); 22 Jun 2006 19:15:20 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2006 12:15:20 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2006 12:15:19 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 7E83E1A983A; Thu, 22 Jun 2006 12:14:59 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r416447 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/java/org/apache/hadoop/mapred/ReduceTask.java Date: Thu, 22 Jun 2006 19:14:58 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060622191459.7E83E1A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Thu Jun 22 12:14:58 2006 New Revision: 416447 URL: http://svn.apache.org/viewvc?rev=416447&view=rev Log: HADOOP-314. Remove the append phase when sorting. Contributed by Owen. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=416447&r1=416446&r2=416447&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jun 22 12:14:58 2006 @@ -30,6 +30,14 @@ single read failure will not alone cause failure of a task. (omalley via cutting) + 8. HADOOP-314. Remove the "append" phase when reducing. Map output + files are now directly passed to the sorter, without first + appending them into a single file. Now, the first third of reduce + progress is "copy" (transferring map output to reduce nodes), the + middle third is "sort" (sorting map output) and the last third is + "reduce" (generating output). Long-term, the "sort" phase will + also be removed. (omalley via cutting) + Release 0.3.2 - 2006-06-09 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=416447&r1=416446&r2=416447&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Jun 22 12:14:58 2006 @@ -49,7 +49,6 @@ public static class Writer { private FSDataOutputStream out; private DataOutputBuffer buffer = new DataOutputBuffer(); - private FileSystem fs = null; private Path target = null; private Class keyClass; @@ -95,7 +94,6 @@ public Writer(FileSystem fs, Path name, Class keyClass, Class valClass, boolean compress) throws IOException { - this.fs = fs; this.target = name; init(fs.create(target), keyClass, valClass, compress); } @@ -205,7 +203,6 @@ private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); private DataInputBuffer inBuf = new DataInputBuffer(); - private FileSystem fs = null; private byte[] version = new byte[VERSION.length]; @@ -239,7 +236,6 @@ private Reader(FileSystem fs, Path name, int bufferSize, Configuration conf) throws IOException { - this.fs = fs; this.file = name; this.in = fs.open(file, bufferSize); this.end = fs.getLength(file); @@ -249,7 +245,6 @@ private Reader(FileSystem fs, Path file, int bufferSize, long start, long length, Configuration conf) throws IOException { - this.fs = fs; this.file = file; this.in = fs.open(file, bufferSize); this.conf = conf; @@ -465,8 +460,7 @@ private WritableComparator comparator; - private Path inFile; // when sorting - private Path[] inFiles; // when merging + private Path[] inFiles; // when merging or sorting private Path outFile; @@ -508,16 +502,22 @@ /** Get the total amount of buffer memory, in bytes.*/ public int getMemory() { return memory; } - /** Perform a file sort.*/ - public void sort(Path inFile, Path outFile) throws IOException { + /** + * Perform a file sort from a set of input files into an output file. + * @param inFiles the files to be sorted + * @param outFile the sorted output file + * @param deleteInput should the input files be deleted as they are read? + */ + public void sort(Path[] inFiles, Path outFile, + boolean deleteInput) throws IOException { if (fs.exists(outFile)) { throw new IOException("already exists: " + outFile); } - this.inFile = inFile; + this.inFiles = inFiles; this.outFile = outFile; - int segments = sortPass(); + int segments = sortPass(deleteInput); int pass = 1; while (segments > 1) { segments = mergePass(pass, segments <= factor); @@ -525,11 +525,20 @@ } } - private int sortPass() throws IOException { + /** + * The backwards compatible interface to sort. + * @param inFile the input file to sort + * @param outFile the sorted output file + */ + public void sort(Path inFile, Path outFile) throws IOException { + sort(new Path[]{inFile}, outFile, false); + } + + private int sortPass(boolean deleteInput) throws IOException { LOG.debug("running sort pass"); - SortPass sortPass = new SortPass(this.conf); // make the SortPass + SortPass sortPass = new SortPass(); // make the SortPass try { - return sortPass.run(); // run it + return sortPass.run(deleteInput); // run it } finally { sortPass.close(); // close it } @@ -550,13 +559,15 @@ private FSDataOutputStream out; private Path outName; - public SortPass(Configuration conf) throws IOException { - in = new Reader(fs, inFile, conf); - } - - public int run() throws IOException { + public int run(boolean deleteInput) throws IOException { int segments = 0; - boolean atEof = false; + int currentFile = 0; + boolean atEof = currentFile >= inFiles.length; + boolean isCompressed = false; + if (!atEof) { + in = new Reader(fs, inFiles[currentFile], conf); + isCompressed = in.isCompressed(); + } while (!atEof) { int count = 0; buffer.reset(); @@ -564,12 +575,21 @@ int start = buffer.getLength(); // read an entry into buffer int keyLength = in.next(buffer); - int length = buffer.getLength() - start; - if (keyLength == -1) { - atEof = true; - break; + in.close(); + if (deleteInput) { + fs.delete(inFiles[currentFile]); + } + currentFile += 1; + atEof = currentFile >= inFiles.length; + if (!atEof) { + in = new Reader(fs, inFiles[currentFile], conf); + } else { + in = null; + } + continue; } + int length = buffer.getLength() - start; if (count == starts.length) grow(); @@ -586,15 +606,16 @@ LOG.info("flushing segment " + segments); rawBuffer = buffer.getData(); sort(count); - flush(count, segments==0 && atEof); + flush(count, isCompressed, segments==0 && atEof); segments++; } return segments; } public void close() throws IOException { - in.close(); - + if (in != null) { + in.close(); + } if (out != null) { out.close(); } @@ -615,7 +636,8 @@ return result; } - private void flush(int count, boolean done) throws IOException { + private void flush(int count, boolean isCompressed, + boolean done) throws IOException { if (out == null) { outName = done ? outFile : outFile.suffix(".0"); out = fs.create(outName); @@ -630,7 +652,7 @@ out.writeLong(count); // write count } - Writer writer = new Writer(out, keyClass, valClass, in.isCompressed()); + Writer writer = new Writer(out, keyClass, valClass, isCompressed); if (!done) { writer.sync = null; // disable sync on temp files } @@ -701,7 +723,6 @@ } private class MergePass { - private int pass; private boolean last; private MergeQueue queue; @@ -709,7 +730,6 @@ private Path inName; public MergePass(int pass, boolean last) throws IOException { - this.pass = pass; this.last = last; this.queue = Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=416447&r1=416446&r2=416447&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun 22 12:14:58 2006 @@ -44,7 +44,6 @@ { getProgress().setStatus("reduce"); } private Progress copyPhase = getProgress().addPhase("copy"); - private Progress appendPhase = getProgress().addPhase("append"); private Progress sortPhase = getProgress().addPhase("sort"); private Progress reducePhase = getProgress().addPhase("reduce"); private JobConf conf; @@ -173,7 +172,6 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { - Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); Reducer reducer = (Reducer)job.newInstance(job.getReducerClass()); reducer.configure(job); @@ -182,44 +180,10 @@ copyPhase.complete(); // copy is already complete // open a file to collect map output - Path file = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.1"); - SequenceFile.Writer writer = - new SequenceFile.Writer(lfs, file, keyClass, valueClass); - try { - // append all input files into a single input file - for (int i = 0; i < numMaps; i++) { - appendPhase.addPhase(); // one per file - } - - DataOutputBuffer buffer = new DataOutputBuffer(); - - for (int i = 0; i < numMaps; i++) { - Path partFile = - this.mapOutputFile.getInputFile(i, getTaskId()); - float progPerByte = 1.0f / lfs.getLength(partFile); - Progress phase = appendPhase.phase(); - phase.setStatus(partFile.toString()); - - SequenceFile.Reader in = new SequenceFile.Reader(lfs, partFile, job); - try { - int keyLen; - while((keyLen = in.next(buffer)) > 0) { - writer.append(buffer.getData(), 0, buffer.getLength(), keyLen); - phase.set(in.getPosition()*progPerByte); - reportProgress(umbilical); - buffer.reset(); - } - } finally { - in.close(); - } - phase.complete(); - } - - } finally { - writer.close(); + Path[] mapFiles = new Path[numMaps]; + for(int i=0; i < numMaps; i++) { + mapFiles[i] = mapOutputFile.getInputFile(i, getTaskId()); } - - appendPhase.complete(); // append is complete // spawn a thread to give sort progress heartbeats Thread sortProgress = new Thread() { @@ -251,8 +215,7 @@ // sort the input file SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, comparator, valueClass, job); - sorter.sort(file, sortedFile); // sort - lfs.delete(file); // remove unsorted + sorter.sort(mapFiles, sortedFile, true); // sort } finally { sortComplete = true;