Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 4960 invoked from network); 29 May 2009 00:59:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 29 May 2009 00:59:01 -0000 Received: (qmail 68598 invoked by uid 500); 29 May 2009 00:59:13 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 68532 invoked by uid 500); 29 May 2009 00:59:13 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 68523 invoked by uid 99); 29 May 2009 00:59:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 May 2009 00:59:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 May 2009 00:59:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 41A28238886C; Fri, 29 May 2009 00:58:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r779807 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java Date: Fri, 29 May 2009 00:58:44 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090529005844.41A28238886C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Fri May 29 00:58:43 2009 New Revision: 779807 URL: http://svn.apache.org/viewvc?rev=779807&view=rev Log: HADOOP-5664. Change map serialization so a lock is obtained only where contention is possible, rather than for each write. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779807&r1=779806&r2=779807&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Fri May 29 00:58:43 2009 @@ -401,6 +401,9 @@ HADOOP-5620. Add an option to DistCp for preserving modification and access times. (Rodrigo Schmidt via szetszwo) + HADOOP-5664. Change map serialization so a lock is obtained only where + contention is possible, rather than for each write. (cdouglas) + OPTIMIZATIONS HADOOP-5595. NameNode does not need to run a replicator to choose a Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779807&r1=779806&r2=779807&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri May 29 00:58:43 2009 @@ -592,8 +592,8 @@ } - class MapOutputBuffer - implements MapOutputCollector, IndexedSortable { + class MapOutputBuffer + implements MapOutputCollector, IndexedSortable { private final int partitions; private final Partitioner partitioner; private final JobConf job; @@ -635,6 +635,8 @@ private volatile Throwable sortSpillException = null; private final int softRecordLimit; private final int softBufferLimit; + private int recordRemaining; + private int bufferRemaining; private final int minSpillsForCombine; private final IndexedSorter sorter; private final ReentrantLock spillLock = new ReentrantLock(); @@ -682,8 +684,8 @@ if ((sortmb & 0x7FF) != sortmb) { throw new IOException("Invalid \"io.sort.mb\": " + sortmb); } - sorter = ReflectionUtils.newInstance( - job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); + sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", + QuickSort.class, IndexedSorter.class), job); LOG.info("io.sort.mb = " + sortmb); // buffers and accounting int maxMemUsage = sortmb << 20; @@ -696,6 +698,8 @@ kvindices = new int[recordCapacity * ACCTSIZE]; softBufferLimit = (int)(kvbuffer.length * spillper); softRecordLimit = (int)(kvoffsets.length * spillper); + recordRemaining = softRecordLimit; + bufferRemaining = softBufferLimit; LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length); LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length); // k/v serialization @@ -763,38 +767,52 @@ + value.getClass().getName()); } final int kvnext = (kvindex + 1) % kvoffsets.length; - spillLock.lock(); - try { - boolean kvfull; - do { - if (sortSpillException != null) { - throw (IOException)new IOException("Spill failed" - ).initCause(sortSpillException); - } - // sufficient acct space - kvfull = kvnext == kvstart; - final boolean kvsoftlimit = ((kvnext > kvend) - ? kvnext - kvend > softRecordLimit - : kvend - kvnext <= kvoffsets.length - softRecordLimit); - if (kvstart == kvend && kvsoftlimit) { - LOG.info("Spilling map output: record full = " + kvsoftlimit); - startSpill(); - } - if (kvfull) { - try { - while (kvstart != kvend) { - reporter.progress(); - spillDone.await(); + if (--recordRemaining <= 0) { + // Possible for check to remain < zero, if soft limit remains + // in force but unsatisfiable because spill is in progress + spillLock.lock(); + try { + boolean kvfull; + do { + if (sortSpillException != null) { + throw (IOException)new IOException("Spill failed" + ).initCause(sortSpillException); + } + // sufficient acct space + kvfull = kvnext == kvstart; + final boolean kvsoftlimit = ((kvnext > kvend) + ? kvnext - kvend > softRecordLimit + : kvend - kvnext <= kvoffsets.length - softRecordLimit); + if (kvstart == kvend && kvsoftlimit) { + LOG.info("Spilling map output: record full = " + kvfull); + startSpill(); + } + if (kvfull) { + try { + while (kvstart != kvend) { + reporter.progress(); + spillDone.await(); + } + } catch (InterruptedException e) { + throw (IOException)new IOException( + "Collector interrupted while waiting for the writer" + ).initCause(e); } - } catch (InterruptedException e) { - throw (IOException)new IOException( - "Collector interrupted while waiting for the writer" - ).initCause(e); } - } - } while (kvfull); - } finally { - spillLock.unlock(); + } while (kvfull); + final int softOff = kvend + softRecordLimit; + recordRemaining = Math.min( + // out of acct space + (kvnext < kvstart + ? kvstart - kvnext + : kvoffsets.length - kvnext + kvstart), + // soft limit + (kvend < kvnext + ? softOff - kvnext + : kvnext + (softOff - kvoffsets.length))); + } finally { + spillLock.unlock(); + } } try { @@ -905,7 +923,7 @@ * likely result in data loss or corruption. * @see #markRecord() */ - protected synchronized void reset() throws IOException { + protected void reset() throws IOException { // spillLock unnecessary; If spill wraps, then // bufindex < bufstart < bufend so contention is impossible // a stale value for bufstart does not affect correctness, since @@ -931,7 +949,7 @@ private final byte[] scratch = new byte[1]; @Override - public synchronized void write(int v) + public void write(int v) throws IOException { scratch[0] = (byte)v; write(scratch, 0, 1); @@ -945,69 +963,86 @@ * deserialize into the collection buffer. */ @Override - public synchronized void write(byte b[], int off, int len) + public void write(byte b[], int off, int len) throws IOException { boolean buffull = false; boolean wrap = false; - spillLock.lock(); - try { - do { - if (sortSpillException != null) { - throw (IOException)new IOException("Spill failed" - ).initCause(sortSpillException); - } + bufferRemaining -= len; + if (bufferRemaining <= 0) { + // writing these bytes could exhaust available buffer space + // check if spill or blocking is necessary + spillLock.lock(); + try { + do { + if (sortSpillException != null) { + throw (IOException)new IOException("Spill failed" + ).initCause(sortSpillException); + } - // sufficient buffer space? - if (bufstart <= bufend && bufend <= bufindex) { - buffull = bufindex + len > bufvoid; - wrap = (bufvoid - bufindex) + bufstart > len; - } else { - // bufindex <= bufstart <= bufend - // bufend <= bufindex <= bufstart - wrap = false; - buffull = bufindex + len > bufstart; - } + // sufficient buffer space? + if (bufstart <= bufend && bufend <= bufindex) { + buffull = bufindex + len > bufvoid; + wrap = (bufvoid - bufindex) + bufstart > len; + } else { + // bufindex <= bufstart <= bufend + // bufend <= bufindex <= bufstart + wrap = false; + buffull = bufindex + len > bufstart; + } - if (kvstart == kvend) { - // spill thread not running - if (kvend != kvindex) { - // we have records we can spill - final boolean bufsoftlimit = (bufindex > bufend) - ? bufindex - bufend > softBufferLimit - : bufend - bufindex < bufvoid - softBufferLimit; - if (bufsoftlimit || (buffull && !wrap)) { - LOG.info("Spilling map output: buffer full= " + bufsoftlimit); - startSpill(); + if (kvstart == kvend) { + // spill thread not running + if (kvend != kvindex) { + // we have records we can spill + final boolean bufsoftlimit = (bufindex > bufend) + ? bufindex - bufend > softBufferLimit + : bufend - bufindex < bufvoid - softBufferLimit; + if (bufsoftlimit || (buffull && !wrap)) { + LOG.info("Spilling map output: buffer full= " + (buffull && !wrap)); + startSpill(); + } + } else if (buffull && !wrap) { + // We have no buffered records, and this record is too large + // to write into kvbuffer. We must spill it directly from + // collect + final int size = ((bufend <= bufindex) + ? bufindex - bufend + : (bufvoid - bufend) + bufindex) + len; + bufstart = bufend = bufindex = bufmark = 0; + kvstart = kvend = kvindex = 0; + bufvoid = kvbuffer.length; + throw new MapBufferTooSmallException(size + " bytes"); } - } else if (buffull && !wrap) { - // We have no buffered records, and this record is too large - // to write into kvbuffer. We must spill it directly from - // collect - final int size = ((bufend <= bufindex) - ? bufindex - bufend - : (bufvoid - bufend) + bufindex) + len; - bufstart = bufend = bufindex = bufmark = 0; - kvstart = kvend = kvindex = 0; - bufvoid = kvbuffer.length; - throw new MapBufferTooSmallException(size + " bytes"); } - } - if (buffull && !wrap) { - try { - while (kvstart != kvend) { - reporter.progress(); - spillDone.await(); + if (buffull && !wrap) { + try { + while (kvstart != kvend) { + reporter.progress(); + spillDone.await(); + } + } catch (InterruptedException e) { + throw (IOException)new IOException( + "Buffer interrupted while waiting for the writer" + ).initCause(e); } - } catch (InterruptedException e) { - throw (IOException)new IOException( - "Buffer interrupted while waiting for the writer" - ).initCause(e); } - } - } while (buffull && !wrap); - } finally { - spillLock.unlock(); + } while (buffull && !wrap); + final int softOff = bufend + softBufferLimit; + bufferRemaining = Math.min( + // out of buffer space + (bufindex < bufstart + ? bufstart - bufindex + : kvbuffer.length - bufindex + bufstart), + // soft limit + (bufend < bufindex + ? softOff - bufindex + : bufindex + (softOff - kvbuffer.length))); + } finally { + spillLock.unlock(); + } + } else { + buffull = bufindex + len > bufvoid; } // here, we know that we have sufficient space to write if (buffull) { @@ -1019,11 +1054,12 @@ } System.arraycopy(b, off, kvbuffer, bufindex, len); bufindex += len; + bufferRemaining -= len; } } - public synchronized void flush() throws IOException, ClassNotFoundException, - InterruptedException { + public void flush() throws IOException, ClassNotFoundException, + InterruptedException { LOG.info("Starting flush of map output"); spillLock.lock(); try { @@ -1103,7 +1139,7 @@ } } - private synchronized void startSpill() { + private void startSpill() { LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +