Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 76751 invoked from network); 3 Jun 2008 04:19:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Jun 2008 04:19:51 -0000 Received: (qmail 14426 invoked by uid 500); 3 Jun 2008 04:19:53 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 14399 invoked by uid 500); 3 Jun 2008 04:19:53 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 14390 invoked by uid 99); 3 Jun 2008 04:19:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jun 2008 21:19:53 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jun 2008 04:19:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C92AA2388A06; Mon, 2 Jun 2008 21:19:26 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r662634 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java Date: Tue, 03 Jun 2008 04:19:26 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080603041926.C92AA2388A06@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Mon Jun 2 21:19:25 2008 New Revision: 662634 URL: http://svn.apache.org/viewvc?rev=662634&view=rev Log: HADOOP-3443. Avoid copying map output across partitions when renaming a single spill. Contributed by Owen O'Malley. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662634&r1=662633&r2=662634&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Jun 2 21:19:25 2008 @@ -391,6 +391,9 @@ HADOOP-3475. Fix MapTask to correctly size the accounting allocation of io.sort.mb. (cdouglas) + HADOOP-3443. Avoid copying map output across partitions when renaming a + single spill. (omalley via cdouglas) + Release 0.17.0 - 2008-05-18 INCOMPATIBLE CHANGES Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=662634&r1=662633&r2=662634&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jun 2 21:19:25 2008 @@ -35,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -58,7 +57,6 @@ import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.mapred.ReduceTask.ValuesIterator; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.QuickSort; @@ -174,7 +172,7 @@ public float getProgress() throws IOException { return rawIn.getProgress(); } - }; + } @Override @SuppressWarnings("unchecked") @@ -366,6 +364,7 @@ if ((sortmb & 0x7FF) != sortmb) { throw new IOException("Invalid \"io.sort.mb\": " + sortmb); } + LOG.info("io.sort.mb = " + sortmb); // buffers and accounting int maxMemUsage = sortmb << 20; int recordCapacity = (int)(maxMemUsage * recper); @@ -377,6 +376,8 @@ kvindices = new int[recordCapacity * ACCTSIZE]; softBufferLimit = (int)(kvbuffer.length * spillper); softRecordLimit = (int)(kvoffsets.length * spillper); + LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length); + LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length); // k/v serialization comparator = job.getOutputKeyComparator(); keyClass = job.getMapOutputKeyClass(); @@ -643,6 +644,12 @@ ? bufindex - bufend > softBufferLimit : bufend - bufindex < bufvoid - softBufferLimit; if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) { + LOG.info("Spilling map output: buffer full = " + bufsoftlimit+ + " and record full = " + kvsoftlimit); + LOG.info("bufindex = " + bufindex + "; bufend = " + bufend + + "; bufvoid = " + bufvoid); + LOG.info("kvindex = " + kvindex + "; kvend = " + kvend + + "; length = " + kvoffsets.length); kvend = kvindex; bufend = bufmark; // TODO No need to recreate this thread every time @@ -693,6 +700,7 @@ } public synchronized void flush() throws IOException { + LOG.info("Starting flush of map output"); synchronized (spillLock) { while (kvstart != kvend) { try { @@ -815,6 +823,7 @@ } } ++numSpills; + LOG.info("Finished spill " + numSpills); } finally { if (out != null) out.close(); if (indexOut != null) indexOut.close(); @@ -976,7 +985,15 @@ for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(getTaskID(), i); indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i); - finalOutFileSize += localFs.getLength(filename[i]); + finalOutFileSize += localFs.getFileStatus(filename[i]).getLen(); + } + + if (numSpills == 1) { //the spill is the final output + localFs.rename(filename[0], + new Path(filename[0].getParent(), "file.out")); + localFs.rename(indexFileName[0], + new Path(indexFileName[0].getParent(),"file.out.index")); + return; } //make correction in the length to include the sequence file header //lengths for each partition @@ -989,12 +1006,6 @@ Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite( getTaskID(), finalIndexFileSize); - if (numSpills == 1) { //the spill is the final output - localFs.rename(filename[0], finalOutputFile); - localFs.rename(indexFileName[0], finalIndexFile); - return; - } - //The output stream for the final single output file FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 4096);