hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r643195 - in /hadoop/core/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/util/
Date Mon, 31 Mar 2008 22:46:00 GMT
Author: omalley
Date: Mon Mar 31 15:45:48 2008
New Revision: 643195

URL: http://svn.apache.org/viewvc?rev=643195&view=rev
Log:
HADOOP-2919.  Reduce the number of memory copies done during the
 map output sorting. Contributed by cdouglas.

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSortable.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSorter.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/QuickSort.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestIndexedSort.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=643195&r1=643194&r2=643195&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar 31 15:45:48 2008
@@ -202,6 +202,14 @@
     tasks by replacing static arrays with lists of runnable tasks. 
     (Amar Kamat via omalley)
 
+    HADOOP-2919.  Reduce the number of memory copies done during the
+    map output sorting. Also adds two config variables:
+    io.sort.spill.percent - the percentages of io.sort.mb that should
+                            cause a spill (default 80%)
+    io.sort.record.percent - the percent of io.sort.mb that should
+                             hold key/value indexes (default 5%)
+    (cdouglas via omalley)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=643195&r1=643194&r2=643195&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Mon Mar 31 15:45:48 2008
@@ -80,6 +80,24 @@
 </property>
 
 <property>
+  <name>io.sort.record.percent</name>
+  <value>0.05</value>
+  <description>The percentage of io.sort.mb dedicated to tracking record
+  boundaries. Let this value be r, io.sort.mb be x. The maximum number
+  of records collected before the collection thread must block is equal
+  to (r * x) / 4</description>
+</property>
+
+<property>
+  <name>io.sort.spill.percent</name>
+  <value>0.80</value>
+  <description>The soft limit in either the buffer or record collection
+  buffers. Once reached, a thread will begin to spill the contents to disk
+  in the background. Note that this does not imply any chunking of data to
+  the spill. A value less than 0.5 is not recommended.</description>
+</property>
+
+<property>
   <name>io.file.buffer.size</name>
   <value>4096</value>
   <description>The size of buffer for use in sequence files.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=643195&r1=643194&r2=643195&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Mar 31 15:45:48 2008
@@ -584,6 +584,11 @@
      */
     public void writeCompressedBytes(DataOutputStream outStream) 
       throws IllegalArgumentException, IOException;
+
+    /**
+     * Size of stored data.
+     */
+    public int getSize();
   }
   
   private static class UncompressedBytes implements ValueBytes {
@@ -1018,14 +1023,12 @@
       out.write(buffer.getData(), 0, buffer.getLength()); // data
     }
 
-    public synchronized void appendRaw(
-                                       byte[] keyData, int keyOffset, int keyLength, ValueBytes val) 
-      throws IOException {
+    public synchronized void appendRaw(byte[] keyData, int keyOffset,
+        int keyLength, ValueBytes val) throws IOException {
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + keyLength);
 
-      UncompressedBytes value = (UncompressedBytes)val;
-      int valLength = value.getSize();
+      int valLength = val.getSize();
 
       checkAndWriteSync();
       
@@ -1144,16 +1147,13 @@
     }
 
     /** Append a key/value pair. */
-    public synchronized void appendRaw(
-                                       byte[] keyData, int keyOffset, int keyLength,
-                                       ValueBytes val
-                                       ) throws IOException {
+    public synchronized void appendRaw(byte[] keyData, int keyOffset,
+        int keyLength, ValueBytes val) throws IOException {
 
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      CompressedBytes value = (CompressedBytes)val;
-      int valLength = value.getSize();
+      int valLength = val.getSize();
       
       checkAndWriteSync();                        // sync
       out.writeInt(keyLength+valLength);          // total record length
@@ -1333,16 +1333,13 @@
     }
     
     /** Append a key/value pair. */
-    public synchronized void appendRaw(
-                                       byte[] keyData, int keyOffset, int keyLength,
-                                       ValueBytes val
-                                       ) throws IOException {
+    public synchronized void appendRaw(byte[] keyData, int keyOffset,
+        int keyLength, ValueBytes val) throws IOException {
       
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      UncompressedBytes value = (UncompressedBytes)val;
-      int valLength = value.getSize();
+      int valLength = val.getSize();
       
       // Save key/value data in relevant buffers
       WritableUtils.writeVInt(keyLenBuffer, keyLength);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?rev=643195&r1=643194&r2=643195&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Mon Mar 31 15:45:48 2008
@@ -222,7 +222,7 @@
       dataSize = length;
     }
             
-    public int getSize() throws IOException {
+    public int getSize() {
       return dataSize;
     }
             

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=643195&r1=643194&r2=643195&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Mar 31 15:45:48 2008
@@ -22,11 +22,14 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -35,22 +38,27 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.InputBuffer;
-import org.apache.hadoop.io.OutputBuffer;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Sorter;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
+import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.IndexedSorter;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.apache.hadoop.mapred.Task.Counter.*;
@@ -252,8 +260,6 @@
     }
 
     public void flush() throws IOException {
-      // TODO Auto-generated method stub
-      
     }
 
     public void collect(K key, V value) throws IOException {
@@ -261,123 +267,152 @@
     }
     
   }
-  
-  class MapOutputBuffer implements MapOutputCollector {
 
+  class MapOutputBuffer implements MapOutputCollector, IndexedSortable {
     private final int partitions;
-    private Partitioner partitioner;
-    private JobConf job;
-    private Reporter reporter;
-
-    private OutputBuffer keyValBuffer; //the buffer where key/val will
-                                       //be stored before they are 
-                                       //passed on to the pending buffer
-    private OutputBuffer pendingKeyvalBuffer; // the key value buffer used
-                                              // while spilling
-    // a lock used for sync sort-spill with collect
-    private final Object pendingKeyvalBufferLock = new Object();
-    // since sort-spill and collect are done concurrently, exceptions are 
-    // passed through shared variable
-    private volatile IOException sortSpillException; 
-    private int maxBufferSize; //the max amount of in-memory space after which
-                               //we will spill the keyValBuffer to disk
-    private int numSpills; //maintains the no. of spills to disk done so far
-    
-    private FileSystem localFs;
-    private CompressionCodec codec;
-    private CompressionType compressionType;
-    private Class keyClass;
-    private Class valClass;
-    private RawComparator comparator;
-    private SerializationFactory serializationFactory;
-    private Serializer keySerializer;
-    private Serializer valSerializer;
-    private InputBuffer keyIn = new InputBuffer();
-    private InputBuffer valIn = new InputBuffer();
-    private Deserializer keyDeserializer;
-    private Deserializer valDeserializer;    
-    private BufferSorter []sortImpl;
-    private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
-    private SequenceFile.Writer writer;
-    private FSDataOutputStream out;
-    private FSDataOutputStream indexOut;
-    private long segmentStart;
-    private Counters.Counter mapOutputByteCounter;
-    private Counters.Counter mapOutputRecordCounter;
-    private Counters.Counter combineInputCounter;
-    private Counters.Counter combineOutputCounter;
-    
+    private final Partitioner partitioner;
+    private final JobConf job;
+    private final Reporter reporter;
+    private final Class keyClass;
+    private final Class valClass;
+    private final RawComparator comparator;
+    private final SerializationFactory serializationFactory;
+    private final Serializer keySerializer;
+    private final Serializer valSerializer;
+    private final Class<? extends Reducer> combinerClass;
+    private final CombineOutputCollector combineCollector;
+    private final boolean compressMapOutput;
+    private final CompressionCodec codec;
+    private final CompressionType compressionType;
+
+    // used if compressMapOutput && compressionType == RECORD
+    // DataOutputBuffer req b/c compression codecs req continguous buffer
+    private final DataOutputBuffer rawBuffer;
+    private final CompressionOutputStream deflateFilter;
+    private final DataOutputStream deflateStream;
+    private final Compressor compressor;
+
+    // k/v accounting
+    private volatile int kvstart = 0;  // marks beginning of spill
+    private volatile int kvend = 0;    // marks beginning of collectable
+    private int kvindex = 0;           // marks end of collected
+    private final int[] kvoffsets;     // indices into kvindices
+    private final int[] kvindices;     // partition, k/v offsets into kvbuffer
+    private volatile int bufstart = 0; // marks beginning of spill
+    private volatile int bufend = 0;   // marks beginning of collectable
+    private volatile int bufvoid = 0;  // marks the point where we should stop
+                                       // reading at the end of the buffer
+    private int bufindex = 0;          // marks end of collected
+    private int bufmark = 0;           // marks end of record
+    private byte[] kvbuffer;           // main output buffer
+    private static final int PARTITION = 0; // partition offset in acct
+    private static final int KEYSTART = 1;  // key offset in acct
+    private static final int VALSTART = 2;  // val offset in acct
+    private static final int ACCTSIZE = 3;  // total #fields in acct
+
+    // spill accounting
+    private volatile int numSpills = 0;
+    private volatile IOException sortSpillException = null;
+    private final int softRecordLimit;
+    private final int softBufferLimit;
+    private final Object spillLock = new Object();
+    private final QuickSort sorter = new QuickSort();
+    private final BlockingBuffer bb = new BlockingBuffer();
+
+    private final FileSystem localFs;
+
+    private final Counters.Counter mapOutputByteCounter;
+    private final Counters.Counter mapOutputRecordCounter;
+    private final Counters.Counter combineInputCounter;
+    private final Counters.Counter combineOutputCounter;
+
     @SuppressWarnings("unchecked")
-    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
+    public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
                            Reporter reporter) throws IOException {
-      this.partitions = job.getNumReduceTasks();
-      this.partitioner = (Partitioner)ReflectionUtils.newInstance(
-                                                                  job.getPartitionerClass(), job);
-      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
-      this.sortSpillException = null;
-      keyValBuffer = new OutputBuffer();
-
       this.job = job;
       this.reporter = reporter;
-      this.comparator = job.getOutputKeyComparator();
-      this.keyClass = job.getMapOutputKeyClass();
-      this.valClass = job.getMapOutputValueClass();
-      this.serializationFactory = new SerializationFactory(conf);
-      this.keySerializer = serializationFactory.getSerializer(keyClass);
-      this.keySerializer.open(keyValBuffer);
-      this.valSerializer = serializationFactory.getSerializer(valClass);
-      this.valSerializer.open(keyValBuffer);
-      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-      this.keyDeserializer.open(keyIn);
-      this.valDeserializer = serializationFactory.getDeserializer(valClass);
-      this.valDeserializer.open(valIn);
-      this.localFs = FileSystem.getLocal(job);
-      this.codec = null;
-      this.compressionType = CompressionType.NONE;
-      if (job.getCompressMapOutput()) {
-        // find the kind of compression to do, defaulting to record
-        compressionType = job.getMapOutputCompressionType();
-
-        // find the right codec
-        Class codecClass = 
-          job.getMapOutputCompressorClass(DefaultCodec.class);
-        codec = (CompressionCodec) 
-          ReflectionUtils.newInstance(codecClass, job);
-      }
-      sortImpl = new BufferSorter[partitions];
+      localFs = FileSystem.getLocal(job);
+      partitions = job.getNumReduceTasks();
+      partitioner = (Partitioner)
+        ReflectionUtils.newInstance(job.getPartitionerClass(), job);
+      // sanity checks
+      final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
+      final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
+      final int sortmb = job.getInt("io.sort.mb", 100);
+      if (spillper > (float)1.0 || spillper < (float)0.0) {
+        throw new IOException("Invalid \"io.sort.spill.percent\": " + spillper);
+      }
+      if (recper > (float)1.0 || recper < (float)0.01) {
+        throw new IOException("Invalid \"io.sort.record.percent\": " + recper);
+      }
+      if ((sortmb & 0x7FF) != sortmb) {
+        throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
+      }
+      // buffers and accounting
+      int maxMemUsage = sortmb << 20;
+      int recordCapacity = (int)(maxMemUsage * recper);
+      recordCapacity += (recordCapacity >>> 2) % 4;
+      kvbuffer = new byte[maxMemUsage - recordCapacity];
+      bufvoid = kvbuffer.length;
+      int kvcapacity = recordCapacity >>> 2;
+      kvoffsets = new int[kvcapacity];
+      kvindices = new int[recordCapacity - kvcapacity];
+      softBufferLimit = (int)(kvbuffer.length * spillper);
+      softRecordLimit = (int)(kvoffsets.length * spillper);
+      // k/v serialization
+      comparator = job.getOutputKeyComparator();
+      keyClass = job.getMapOutputKeyClass();
+      valClass = job.getMapOutputValueClass();
+      serializationFactory = new SerializationFactory(job);
+      keySerializer = serializationFactory.getSerializer(keyClass);
+      keySerializer.open(bb);
+      valSerializer = serializationFactory.getSerializer(valClass);
+      valSerializer.open(bb);
+      // counters
       Counters counters = getCounters();
       mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
       combineInputCounter = getCounters().findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
-      for (int i = 0; i < partitions; i++)
-        sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
-                                                                job.getClass("map.sort.class", MergeSorter.class,
-                                                                             BufferSorter.class), job);
-    }
-    
-    private void startPartition(int partNumber) throws IOException {
-      //We create the sort output as multiple sequence files within a spilled
-      //file. So we create a writer for each partition. 
-      segmentStart = out.getPos();
-      writer =
-        SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
-                                  job.getMapOutputValueClass(), compressionType, codec);
-    }
-    private void endPartition(int partNumber) throws IOException {
-      //Need to close the file, especially if block compression is in use
-      //We also update the index file to contain the part offsets per 
-      //spilled file
-      writer.close();
-      indexOut.writeLong(segmentStart);
-      //we also store 0 length key/val segments to make the merge phase easier.
-      indexOut.writeLong(out.getPos()-segmentStart);
+      // combiner and compression
+      compressMapOutput = job.getCompressMapOutput();
+      combinerClass = job.getCombinerClass();
+      if (compressMapOutput) {
+        compressionType = job.getMapOutputCompressionType();
+        Class<? extends CompressionCodec> codecClass =
+          job.getMapOutputCompressorClass(DefaultCodec.class);
+        codec = (CompressionCodec)
+          ReflectionUtils.newInstance(codecClass, job);
+        if (CompressionType.RECORD == compressionType
+            && null == combinerClass) {
+          compressor = codec.createCompressor();
+          rawBuffer = new DataOutputBuffer();
+          deflateFilter = codec.createOutputStream(rawBuffer, compressor);
+          deflateStream = new DataOutputStream(deflateFilter);
+          valSerializer.close();
+          valSerializer.open(deflateStream);
+        } else {
+          rawBuffer = null;
+          compressor = null;
+          deflateStream = null;
+          deflateFilter = null;
+        }
+      } else {
+        compressionType = CompressionType.NONE;
+        codec = null;
+        rawBuffer = null;
+        compressor = null;
+        deflateStream = null;
+        deflateFilter = null;
+      }
+      combineCollector = (null != combinerClass)
+        ? new CombineOutputCollector()
+        : null;
     }
-    
+
     @SuppressWarnings("unchecked")
-    public synchronized void collect(Object key,
-                                     Object value) throws IOException {
-      
+    public synchronized void collect(Object key, Object value)
+        throws IOException {
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
                               + keyClass.getName() + ", recieved "
@@ -388,199 +423,542 @@
                               + valClass.getName() + ", recieved "
                               + value.getClass().getName());
       }
-      
-      // check if the earlier sort-spill generated an exception
       if (sortSpillException != null) {
         throw sortSpillException;
       }
-      
-      if (keyValBuffer == null) {
-        keyValBuffer = new OutputBuffer();
-        keySerializer.open(keyValBuffer);
-        valSerializer.open(keyValBuffer);
-        sortImpl = new BufferSorter[partitions];
-        for (int i = 0; i < partitions; i++)
-          sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
-                            job.getClass("map.sort.class", MergeSorter.class, 
-                                         BufferSorter.class), job);
+      try {
+        int keystart = bufindex;
+        keySerializer.serialize(key);
+        if (bufindex < keystart) {
+          // wrapped the key; reset required
+          bb.reset();
+          keystart = 0;
+        }
+        int valstart = bufindex;
+        if (compressMapOutput && CompressionType.RECORD == compressionType
+            && null == combinerClass) {
+          // compress serialized value bytes
+          rawBuffer.reset();
+          deflateFilter.resetState();
+          valSerializer.serialize(value);
+          deflateStream.flush();
+          deflateFilter.finish();
+          bb.write(rawBuffer.getData(), 0, rawBuffer.getLength());
+          bb.markRecord();
+          mapOutputByteCounter.increment((valstart - keystart) +
+              compressor.getBytesRead());
+        } else {
+          // serialize value bytes into buffer
+          valSerializer.serialize(value);
+          int valend = bb.markRecord();
+          mapOutputByteCounter.increment(valend > keystart
+              ? valend - keystart
+              : (bufvoid - keystart) + valend);
+        }
+        int partition = partitioner.getPartition(key, value, partitions);
+
+        mapOutputRecordCounter.increment(1);
+
+        // update accounting info
+        int ind = kvindex * ACCTSIZE;
+        kvoffsets[kvindex] = ind;
+        kvindices[ind + PARTITION] = partition;
+        kvindices[ind + KEYSTART] = keystart;
+        kvindices[ind + VALSTART] = valstart;
+        kvindex = (kvindex + 1) % kvoffsets.length;
+      } catch (MapBufferTooSmallException e) {
+        LOG.debug("Record too large for in-memory buffer: " + e.getMessage());
+        spillSingleRecord(key, value);
+        mapOutputRecordCounter.increment(1);
+        return;
       }
-      
-      //dump the key/value to buffer
-      int keyOffset = keyValBuffer.getLength(); 
-      keySerializer.serialize(key);
-      int keyLength = keyValBuffer.getLength() - keyOffset;
-      valSerializer.serialize(value);
-      int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
-      int partNumber = partitioner.getPartition(key, value, partitions);
-      sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
-
-      mapOutputRecordCounter.increment(1);
-      mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset);
-
-      //now check whether we need to spill to disk
-      long totalMem = 0;
-      for (int i = 0; i < partitions; i++)
-        totalMem += sortImpl[i].getMemoryUtilized();
-      totalMem += keyValBuffer.getLength();
-      if (totalMem  >= maxBufferSize) {
-        // check if the earlier spill is pending
-        synchronized (pendingKeyvalBufferLock) {
-          // check if the spill is over, there could be a case where the 
-          // sort-spill is yet to start and collect acquired the lock
-          while (pendingKeyvalBuffer != null) {
-            try {
-              // indicate that we are making progress
-              this.reporter.progress();
-              pendingKeyvalBufferLock.wait(); // wait for the pending spill to
-                                              // start and finish sort-spill
-            } catch (InterruptedException ie) {
-              LOG.warn("Buffer interrupted while waiting for the writer", ie);
+
+    }
+
+    /**
+     * Compare logical range, st i, j MOD offset capacity.
+     * Compare by partition, then by key.
+     * @see IndexedSortable#compare
+     */
+    public int compare(int i, int j) {
+      final int ii = kvoffsets[i % kvoffsets.length];
+      final int ij = kvoffsets[j % kvoffsets.length];
+      // sort by partition
+      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
+        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
+      }
+      // sort by key
+      return comparator.compare(kvbuffer,
+          kvindices[ii + KEYSTART],
+          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
+          kvbuffer,
+          kvindices[ij + KEYSTART],
+          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
+    }
+
+    /**
+     * Swap logical indices st i, j MOD offset capacity.
+     * @see IndexedSortable#swap
+     */
+    public void swap(int i, int j) {
+      i %= kvoffsets.length;
+      j %= kvoffsets.length;
+      int tmp = kvoffsets[i];
+      kvoffsets[i] = kvoffsets[j];
+      kvoffsets[j] = tmp;
+    }
+
+    /**
+     * Inner class managing the spill of serialized records to disk.
+     */
+    protected class BlockingBuffer extends DataOutputStream {
+
+      public BlockingBuffer() {
+        this(new Buffer());
+      }
+
+      private BlockingBuffer(OutputStream out) {
+        super(out);
+      }
+
+      /**
+       * Mark end of record. Note that this is required if the buffer is to
+       * cut the spill in the proper place.
+       */
+      public int markRecord() {
+        bufmark = bufindex;
+        return bufindex;
+      }
+
+      /**
+       * Set position from last mark to end of writable buffer, then rewrite
+       * the data between last mark and kvindex.
+       * This handles a special case where the key wraps around the buffer.
+       * If the key is to be passed to a RawComparator, then it must be
+       * contiguous in the buffer. This recopies the data in the buffer back
+       * into itself, but starting at the beginning of the buffer. Note that
+       * reset() should <b>only</b> be called immediately after detecting
+       * this condition. To call it at any other time is undefined and would
+       * likely result in data loss or corruption.
+       * @see #markRecord()
+       */
+      protected synchronized void reset() throws IOException {
+        // spillLock unnecessary; If spill wraps, then
+        // bufindex < bufstart < bufend so contention is impossible
+        // a stale value for bufstart does not affect correctness, since
+        // we can only get false negatives that force the more
+        // conservative path
+        int headbytelen = bufvoid - bufmark;
+        bufvoid = bufmark;
+        if (bufindex + headbytelen < bufstart) {
+          System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
+          System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
+          bufindex += headbytelen;
+        } else {
+          byte[] keytmp = new byte[bufindex];
+          System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
+          bufindex = 0;
+          out.write(kvbuffer, bufmark, headbytelen);
+          out.write(keytmp);
+        }
+      }
+    }
+
+    public class Buffer extends OutputStream {
+      private final byte[] scratch = new byte[1];
+
+      public synchronized void write(int v)
+          throws IOException {
+        scratch[0] = (byte)v;
+        write(scratch, 0, 1);
+      }
+
+      /**
+       * Attempt to write a sequence of bytes to the collection buffer.
+       * This method will block if the spill thread is running and it
+       * cannot write.
+       * @throws MapBufferTooSmallException if record is too large to
+       *    deserialize into the collection buffer.
+       */
+      public synchronized void write(byte b[], int off, int len)
+          throws IOException {
+        boolean kvfull = false;
+        boolean buffull = false;
+        boolean wrap = false;
+        synchronized(spillLock) {
+          do {
+            if (sortSpillException != null) {
+              throw (IOException)new IOException().initCause(
+                  sortSpillException);
             }
-          }
-          // prepare for spilling
-          pendingKeyvalBuffer = keyValBuffer;
-          pendingSortImpl = sortImpl;
-          keySerializer.close();
-          valSerializer.close();
-          keyValBuffer = null;
-          sortImpl = null;
+
+            // sufficient accounting space?
+            kvfull = (kvindex + 1) % kvoffsets.length == kvstart;
+            // sufficient buffer space?
+            if (bufstart <= bufend && bufend <= bufindex) {
+              buffull = bufindex + len > bufvoid;
+              wrap = (bufvoid - bufindex) + bufstart > len;
+            } else {
+              // bufindex <= bufstart <= bufend
+              // bufend <= bufindex <= bufstart
+              wrap = false;
+              buffull = bufindex + len > bufstart;
+            }
+
+            if (kvstart == kvend) {
+              // spill thread not running
+              if (kvend != kvindex) {
+                // we have records we can spill
+                final boolean kvsoftlimit = (kvindex > kvend)
+                  ? kvindex - kvend > softRecordLimit
+                  : kvend - kvindex < kvoffsets.length - softRecordLimit;
+                final boolean bufsoftlimit = (bufindex > bufend)
+                  ? bufindex - bufend > softBufferLimit
+                  : bufend - bufindex < bufvoid - softBufferLimit;
+                if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
+                  kvend = kvindex;
+                  bufend = bufmark;
+                  // TODO No need to recreate this thread every time
+                  SpillThread t = new SpillThread();
+                  t.setDaemon(true);
+                  t.setName("SpillThread");
+                  t.start();
+                }
+              } else if (buffull && !wrap) {
+                // We have no buffered records, and this record is too large
+                // to write into kvbuffer. We must spill it directly from
+                // collect
+                final int size = ((bufend <= bufindex)
+                  ? bufindex - bufend
+                  : (bufvoid - bufend) + bufindex) + len;
+                bufstart = bufend = bufindex = bufmark = 0;
+                kvstart = kvend = kvindex = 0;
+                bufvoid = kvbuffer.length;
+                throw new MapBufferTooSmallException(size + " bytes");
+              }
+            }
+
+            if (kvfull || (buffull && !wrap)) {
+              while (kvstart != kvend) {
+                reporter.progress();
+                try {
+                  spillLock.wait();
+                } catch (InterruptedException e) {
+                  throw (IOException)new IOException(
+                      "Buffer interrupted while waiting for the writer"
+                      ).initCause(e);
+                }
+              }
+            }
+          } while (kvfull || (buffull && !wrap));
+        }
+        // here, we know that we have sufficient space to write
+        if (buffull) {
+          final int gaplen = bufvoid - bufindex;
+          System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
+          len -= gaplen;
+          off += gaplen;
+          bufindex = 0;
         }
+        System.arraycopy(b, off, kvbuffer, bufindex, len);
+        bufindex += len;
+      }
+    }
 
-        // check if the earlier sort-spill thread generated an exception
-        if (sortSpillException != null) {
-          throw sortSpillException;
+    public synchronized void flush() throws IOException {
+      synchronized (spillLock) {
+        while (kvstart != kvend) {
+          try {
+            reporter.progress();
+            spillLock.wait();
+          } catch (InterruptedException e) {
+            throw (IOException)new IOException(
+                "Buffer interrupted while waiting for the writer"
+                ).initCause(e);
+          }
         }
-        
-        // Start the sort-spill thread. While the sort and spill takes place 
-        // using the pending variables, the output collector can collect the 
-        // key-value without getting blocked. Thus making key-value collection 
-        // and sort-spill concurrent.
-        Thread bufferWriter = new Thread() {
-          public void run() {
-            synchronized (pendingKeyvalBufferLock) {
-              sortAndSpillToDisk();
+      }
+      if (sortSpillException != null) {
+        throw (IOException)new IOException().initCause(sortSpillException);
+      }
+      if (kvend != kvindex) {
+        kvend = kvindex;
+        bufend = bufmark;
+        sortAndSpill();
+      }
+      // release sort buffer before the merge
+      kvbuffer = null;
+      mergeParts();
+    }
+
+    public void close() { }
+
+    protected class SpillThread extends Thread {
+
+      public void run() {
+        try {
+          sortAndSpill();
+        } catch (IOException e) {
+          sortSpillException = e;
+        } finally {
+          synchronized(spillLock) {
+            if (bufend < bufindex && bufindex < bufstart) {
+              bufvoid = kvbuffer.length;
             }
+            kvstart = kvend;
+            bufstart = bufend;
+            spillLock.notify();
           }
-        };
-        bufferWriter.setDaemon(true); // to make sure that the buffer writer 
-                                      // gets killed if collector is killed.
-        bufferWriter.setName("SortSpillThread");
-        bufferWriter.start();
+        }
       }
     }
-    
-    //sort, combine and spill to disk
-    private void sortAndSpillToDisk() {
+
+    private void sortAndSpill() throws IOException {
+      //approximate the length of the output file to be the length of the
+      //buffer + header lengths for the partitions
+      long size = (bufend > bufstart
+          ? bufend - bufstart
+          : (bufvoid - bufend) + bufstart) +
+                  partitions * APPROX_HEADER_LENGTH;
+      FSDataOutputStream out = null;
+      FSDataOutputStream indexOut = null;
       try {
-        //approximate the length of the output file to be the length of the
-        //buffer + header lengths for the partitions
-        long size = pendingKeyvalBuffer.getLength() + 
-                    partitions * APPROX_HEADER_LENGTH;
-        Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
+        // create spill file
+        Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(),
                                       numSpills, size);
-        //we just create the FSDataOutputStream object here.
         out = localFs.create(filename);
+        // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
                              getTaskId(), numSpills, partitions * 16);
         indexOut = localFs.create(indexFilename);
-        LOG.debug("opened "+
-                  mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
-          
-        //invoke the sort
-        for (int i = 0; i < partitions; i++) {
-          pendingSortImpl[i].setInputBuffer(pendingKeyvalBuffer);
-          pendingSortImpl[i].setProgressable(reporter);
-          RawKeyValueIterator rIter = pendingSortImpl[i].sort();
-          
-          startPartition(i);
-          if (rIter != null) {
-            //invoke the combiner if one is defined
-            if (job.getCombinerClass() != null) {
-              //we instantiate and close the combiner for each partition. This
-              //is required for streaming where the combiner runs as a separate
-              //process and we want to make sure that the combiner process has
-              //got all the input key/val, processed, and output the result 
-              //key/vals before we write the partition header in the output file
-              Reducer combiner = (Reducer)ReflectionUtils.newInstance(
-                                                                      job.getCombinerClass(), job);
-              // make collector
-              OutputCollector combineCollector = new OutputCollector() {
-                  public void collect(Object key, Object value)
-                    throws IOException {
-                    synchronized (this) {
-                      writer.append(key, value);
-                    }
-                  }
-                };
-              combineAndSpill(rIter, combiner, combineCollector);
-              combiner.close();
+        final int endPosition = (kvend > kvstart)
+          ? kvend
+          : kvoffsets.length + kvend;
+        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
+        int spindex = kvstart;
+        InMemValBytes vbytes = new InMemValBytes();
+        for (int i = 0; i < partitions; ++i) {
+          SequenceFile.Writer writer = null;
+          try {
+            long segmentStart = out.getPos();
+            writer = SequenceFile.createWriter(job, out,
+                keyClass, valClass, compressionType, codec);
+            if (null == combinerClass) {
+              // spill directly
+              while (spindex < endPosition &&
+                  kvindices[kvoffsets[spindex % kvoffsets.length]
+                            + PARTITION] == i) {
+                final int kvoff = kvoffsets[spindex % kvoffsets.length];
+                getVBytesForOffset(kvoff, vbytes);
+                writer.appendRaw(kvbuffer, kvindices[kvoff + KEYSTART],
+                    kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART],
+                    vbytes);
+                ++spindex;
+              }
+            } else {
+              int spstart = spindex;
+              while (spindex < endPosition &&
+                  kvindices[kvoffsets[spindex % kvoffsets.length]
+                            + PARTITION] == i) {
+                ++spindex;
+              }
+              // Note: we would like to avoid the combiner if we've fewer
+              // than some threshold of records for a partition, but we left
+              // our records uncompressed for the combiner. We accept the trip
+              // through the combiner to effect the compression for now;
+              // to remedy this would require us to observe the compression
+              // strategy here as we do in collect
+              if (spstart != spindex) {
+                Reducer combiner =
+                  (Reducer)ReflectionUtils.newInstance(combinerClass, job);
+                combineCollector.setWriter(writer);
+                combineAndSpill(spstart, spindex, combiner, combineCollector);
+                // combineAndSpill closes combiner
+              }
             }
-            else //just spill the sorted data
-              spill(rIter);
+            // we need to close the writer to flush buffered data, obtaining
+            // the correct offset
+            writer.close();
+            writer = null;
+            indexOut.writeLong(segmentStart);
+            indexOut.writeLong(out.getPos() - segmentStart);
+          } finally {
+            if (null != writer) writer.close();
           }
-          endPartition(i);
         }
-        numSpills++;
-        out.close();
-        indexOut.close();
-      } catch (IOException ioe) {
-        sortSpillException = ioe;
-      } finally { // make sure that the collector never waits indefinitely
-        pendingKeyvalBuffer = null;
-        for (int i = 0; i < partitions; i++) {
-          pendingSortImpl[i].close();
-        }
-        pendingKeyvalBufferLock.notify();
+        ++numSpills;
+      } finally {
+        if (out != null) out.close();
+        if (indexOut != null) indexOut.close();
       }
     }
-    
+
+    /**
+     * Handles the degenerate case where serialization fails to fit in
+     * the in-memory buffer, so we must spill the record from collect
+     * directly to a spill file. Consider this "losing".
+     */
     @SuppressWarnings("unchecked")
-    private void combineAndSpill(RawKeyValueIterator resultIter, 
-                                 Reducer combiner, OutputCollector combineCollector) throws IOException {
-      //combine the key/value obtained from the offset & indices arrays.
-      CombineValuesIterator values = new CombineValuesIterator(resultIter,
-                                                               comparator, keyClass, valClass, job, reporter);
-      while (values.more()) {
-        combiner.reduce(values.getKey(), values, combineCollector, reporter);
-        values.nextKey();
-        combineOutputCounter.increment(1);
-        // indicate we're making progress
-        reporter.progress();
+    private void spillSingleRecord(final Object key, final Object value) 
+        throws IOException {
+      long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
+      FSDataOutputStream out = null;
+      FSDataOutputStream indexOut = null;
+      final int partition = partitioner.getPartition(key, value, partitions);
+      try {
+        // create spill file
+        Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(),
+                                      numSpills, size);
+        out = localFs.create(filename);
+        // create spill index
+        Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+                             getTaskId(), numSpills, partitions * 16);
+        indexOut = localFs.create(indexFilename);
+        // we don't run the combiner for a single record
+        for (int i = 0; i < partitions; ++i) {
+          SequenceFile.Writer writer = null;
+          try {
+            long segmentStart = out.getPos();
+            writer = SequenceFile.createWriter(job, out,
+                keyClass, valClass, compressionType, codec);
+            if (i == partition) {
+              final long recordStart = out.getPos();
+              writer.append(key, value);
+              // Note that our map byte count will not be accurate with
+              // compression
+              mapOutputByteCounter.increment(out.getPos() - recordStart);
+            }
+            writer.close();
+            indexOut.writeLong(segmentStart);
+            indexOut.writeLong(out.getPos() - segmentStart);
+          } catch (IOException e) {
+            if (null != writer) writer.close();
+            throw e;
+          }
+        }
+        ++numSpills;
+      } finally {
+        if (out != null) out.close();
+        if (indexOut != null) indexOut.close();
       }
     }
-    
+
+    /**
+     * Given an offset, populate vbytes with the associated set of
+     * deserialized value bytes. Should only be called during a spill.
+     */
+    private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
+      final int nextindex = kvoff / ACCTSIZE == kvend - 1
+        ? bufend
+        : kvindices[kvoff + ACCTSIZE + KEYSTART];
+      int vallen = (nextindex > kvindices[kvoff + VALSTART])
+        ? nextindex - kvindices[kvoff + VALSTART]
+        : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
+      vbytes.reset(kvindices[kvoff + VALSTART], vallen);
+    }
+
     @SuppressWarnings("unchecked")
-    private void spill(RawKeyValueIterator resultIter) throws IOException {
+    private void combineAndSpill(int start, int end, Reducer combiner,
+        OutputCollector combineCollector) throws IOException {
       try {
-        // indicate progress, since constructor may take a while (because of 
-        // user code) 
-        reporter.progress();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-
-      Object key = null;
-      Object value = null;
-      DataOutputBuffer valOut = new DataOutputBuffer();
-      while (resultIter.next()) {
-        keyIn.reset(resultIter.getKey().getData(), 
-                    resultIter.getKey().getLength());
-        key = keyDeserializer.deserialize(key);
-        valOut.reset();
-        (resultIter.getValue()).writeUncompressedBytes(valOut);
-        valIn.reset(valOut.getData(), valOut.getLength());
-        value = valDeserializer.deserialize(value);
-        writer.append(key, value);
-        reporter.progress();
+        CombineValuesIterator values = new CombineValuesIterator(
+            new MRResultIterator(start, end), comparator, keyClass, valClass,
+            job, reporter);
+        while (values.more()) {
+          combiner.reduce(values.getKey(), values, combineCollector, reporter);
+          values.nextKey();
+          combineOutputCounter.increment(1);
+          // indicate we're making progress
+          reporter.progress();
+        }
+      } finally {
+        combiner.close();
       }
     }
-    
+
+    /**
+     * Inner class wrapping valuebytes, used for appendRaw.
+     */
+    protected class InMemValBytes implements ValueBytes {
+      private int start;
+      private int len;
+      public void reset(int start, int len) {
+        this.start = start;
+        this.len = len;
+      }
+      public int getSize() {
+        return len;
+      }
+      public void writeUncompressedBytes(DataOutputStream outStream)
+          throws IOException {
+        if (start + len > bufvoid) {
+          final int taillen = bufvoid - start;
+          outStream.write(kvbuffer, start, taillen);
+          outStream.write(kvbuffer, 0, len - taillen);
+          return;
+        }
+        outStream.write(kvbuffer, start, len);
+      }
+      public void writeCompressedBytes(DataOutputStream outStream)
+          throws IOException {
+        // If writing record-compressed data, kvbuffer vals rec-compressed
+        // and may be written directly. Note: not contiguous
+        writeUncompressedBytes(outStream);
+      }
+    }
+
+    protected class MRResultIterator implements RawKeyValueIterator {
+      private final DataOutputBuffer keybuf = new DataOutputBuffer();
+      private final InMemValBytes vbytes = new InMemValBytes();
+      private final int end;
+      private int current;
+      public MRResultIterator(int start, int end) {
+        this.end = end;
+        current = start - 1;
+      }
+      public boolean next() throws IOException {
+        return ++current < end;
+      }
+      public DataOutputBuffer getKey() throws IOException {
+        final int kvoff = kvoffsets[current % kvoffsets.length];
+        keybuf.reset();
+        keybuf.write(kvbuffer, kvindices[kvoff + KEYSTART],
+            kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
+        return keybuf;
+      }
+      public ValueBytes getValue() throws IOException {
+        getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
+        return vbytes;
+      }
+      public Progress getProgress() {
+        return null;
+      }
+      public void close() { }
+    }
+
+    private class CombineValuesIterator<KEY,VALUE>
+        extends ValuesIterator<KEY,VALUE> {
+
+      public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+          RawComparator<KEY> comparator, Class<KEY> keyClass,
+          Class<VALUE> valClass, Configuration conf, Reporter reporter)
+          throws IOException {
+        super(in, comparator, keyClass, valClass, conf, reporter);
+      }
+
+      public VALUE next() {
+        combineInputCounter.increment(1);
+        return super.next();
+      }
+    }
+
     private void mergeParts() throws IOException {
       // get the approximate size of the final output/index files
       long finalOutFileSize = 0;
       long finalIndexFileSize = 0;
       Path [] filename = new Path[numSpills];
       Path [] indexFileName = new Path[numSpills];
+      FileSystem localFs = FileSystem.getLocal(job);
       
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
@@ -676,71 +1054,32 @@
         }
       }
     }
-    
-    public void close() throws IOException {
-      //empty for now
+
+  }
+
+  /**
+   * OutputCollector for the combiner.
+   */
+  private static class CombineOutputCollector implements OutputCollector {
+    private SequenceFile.Writer writer;
+    public synchronized void setWriter(SequenceFile.Writer writer) {
+      this.writer = writer;
     }
-    
-    private class CombineValuesIterator<KEY,VALUE> 
-            extends ValuesIterator<KEY,VALUE> {
-        
-      public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
-                                   RawComparator<KEY> comparator, 
-                                   Class<KEY> keyClass,
-                                   Class<VALUE> valClass, Configuration conf, 
-                                   Reporter reporter) 
+    public synchronized void collect(Object key, Object value)
         throws IOException {
-        super(in, comparator, keyClass, valClass, conf, reporter);
-      }
-      
-      public VALUE next() {
-        combineInputCounter.increment(1);
-        return super.next();
-      }
+        writer.append(key, value);
     }
+  }
 
-    public synchronized void flush() throws IOException 
-    {
-      //check whether the length of the key/value buffer is 0. If not, then
-      //we need to spill that to disk. Note that we reset the key/val buffer
-      //upon each spill (so a length > 0 means that we have not spilled yet)
-      
-      // check if the earlier spill is pending
-      synchronized (pendingKeyvalBufferLock) {
-        // this could mean that either the sort-spill is over or is yet to 
-        // start so make sure that the earlier sort-spill is over.
-        while (pendingKeyvalBuffer != null) {
-          try {
-            // indicate that we are making progress
-            this.reporter.progress();
-            pendingKeyvalBufferLock.wait();
-          } catch (InterruptedException ie) {
-            LOG.info("Buffer interrupted while for the pending spill", ie);
-          }
-        }
-      }
-      
-      // check if the earlier sort-spill thread generated an exception
-      if (sortSpillException != null) {
-        throw sortSpillException;
-      }
-      
-      if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
-        // prepare for next spill
-        synchronized (pendingKeyvalBufferLock) {
-          pendingKeyvalBuffer = keyValBuffer;
-          pendingSortImpl = sortImpl;
-          keyValBuffer = null;
-          sortImpl = null;
-          sortAndSpillToDisk();
-        }
-      }  
-      
-      // check if the last sort-spill thread generated an exception
-      if (sortSpillException != null) {
-        throw sortSpillException;
-      }
-      mergeParts();
+  /**
+   * Exception indicating that the allocated sort buffer is insufficient
+   * to hold the current record.
+   */
+  @SuppressWarnings("serial")
+  private static class MapBufferTooSmallException extends IOException {
+    public MapBufferTooSmallException(String s) {
+      super(s);
     }
   }
+
 }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSortable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSortable.java?rev=643195&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSortable.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSortable.java Mon Mar 31 15:45:48 2008
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * Interface for collections capable of being sorted by {@link IndexedSorter}
+ * algorithms.
+ */
+public interface IndexedSortable {
+
+  /**
+   * Compare items at the given addresses consistent with the semantics of
+   * {@link java.util.Comparable#compare}.
+   */
+  int compare(int i, int j);
+
+  /**
+   * Swap items at the given addresses.
+   */
+  void swap(int i, int j);
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSorter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSorter.java?rev=643195&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSorter.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/IndexedSorter.java Mon Mar 31 15:45:48 2008
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * Interface for sort algorithms accepting {@link IndexedSortable} items.
+ *
+ * A sort algorithm implementing this interface may only
+ * {@link IndexedSortable#compare} and {@link IndexedSortable#swap} items
+ * for a range of indices to effect a sort across that range.
+ */
+public interface IndexedSorter {
+
+  /**
+   * Sort the items accessed through the given IndexedSortable over the given
+   * range of logical indices. From the perspective of the sort algorithm,
+   * each index between l (inclusive) and r (exclusive) is an addressable
+   * entry.
+   * @see IndexedSortable#compare
+   * @see IndexedSortable#swap
+   */
+  void sort(IndexedSortable s, int l, int r);
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/util/QuickSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/QuickSort.java?rev=643195&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/QuickSort.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/QuickSort.java Mon Mar 31 15:45:48 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * An implementation of the core algorithm of QuickSort.
+ * See "Median-of-Three Partitioning" in Sedgewick book.
+ */
+public class QuickSort implements IndexedSorter {
+
+  public QuickSort() { }
+
+  private void fix(IndexedSortable s, int p, int r) {
+    if (s.compare(p, r) > 0) {
+      s.swap(p, r);
+    }
+  }
+
+  public void sort(IndexedSortable s, int p, int r) {
+    sort(s, p, r, null);
+  }
+
+  /**
+   * Same as {@link #sort}, but indicate that we're making progress after
+   * each partition.
+   */
+  public void sort(IndexedSortable s, int p, int r, Progressable rep) {
+    if (null != rep) {
+      rep.progress();
+    }
+    if (r-p < 13) {
+      for (int i = p; i < r; ++i) {
+        for (int j = i; j > p; --j) {
+          if (s.compare(j-1, j) > 0) {
+            s.swap(j, j-1);
+          }
+        }
+      }
+      return;
+    }
+
+    // select, move pivot into first position
+    fix(s, (p+r) >>> 1, p);
+    fix(s, (p+r) >>> 1, r - 1);
+    fix(s, p, r-1);
+
+    // Divide
+    int x = p;
+    int i = p;
+    int j = r;
+    while(true) {
+      while (++i < r && s.compare(i, x) < 0) { } // move lindex
+      while (--j > x && s.compare(x, j) < 0) { } // move rindex
+      if (i < j) s.swap(i, j);
+      else break;
+    }
+    // swap pivot into position
+    s.swap(x, i - 1);
+
+    // Conquer
+    // Recurse on smaller interval first to keep stack shallow
+    if (i - p - 1 < r - i) {
+      sort(s, p, i - 1, rep);
+      sort(s, i, r, rep);
+    } else {
+      sort(s, i, r, rep);
+      sort(s, p, i - 1, rep);
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestIndexedSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestIndexedSort.java?rev=643195&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestIndexedSort.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestIndexedSort.java Mon Mar 31 15:45:48 2008
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+
+public class TestIndexedSort extends TestCase {
+
+  private static class SampleSortable implements IndexedSortable {
+    private int[] valindex;
+    private int[] valindirect;
+    private int[] values;
+
+    public SampleSortable() {
+      this(50);
+    }
+
+    public SampleSortable(int j) {
+      Random r = new Random();
+      values = new int[j];
+      valindex = new int[j];
+      valindirect = new int[j];
+      for (int i = 0; i < j; ++i) {
+        valindex[i] = valindirect[i] = i;
+        values[i] = r.nextInt(1000);
+      }
+    }
+
+    public SampleSortable(int[] values) {
+      this.values = values;
+      valindex = new int[values.length];
+      valindirect = new int[values.length];
+      for (int i = 0; i < values.length; ++i) {
+        valindex[i] = valindirect[i] = i;
+      }
+    }
+
+    public int compare(int i, int j) {
+      // assume positive
+      return
+        values[valindirect[valindex[i]]] - values[valindirect[valindex[j]]];
+    }
+
+    public void swap(int i, int j) {
+      int tmp = valindex[i];
+      valindex[i] = valindex[j];
+      valindex[j] = tmp;
+    }
+
+    public int[] getSorted() {
+      int[] ret = new int[values.length];
+      for (int i = 0; i < ret.length; ++i) {
+        ret[i] = values[valindirect[valindex[i]]];
+      }
+      return ret;
+    }
+
+    public int[] getValues() {
+      int[] ret = new int[values.length];
+      System.arraycopy(values, 0, ret, 0, values.length);
+      return ret;
+    }
+
+  }
+
+  private static class WritableSortable implements IndexedSortable {
+
+    private static Random r = new Random();
+    private final int eob;
+    private final int[] indices;
+    private final int[] offsets;
+    private final byte[] bytes;
+    private final WritableComparator comparator;
+    private final String[] check;
+
+    public WritableSortable() throws IOException {
+      this(100);
+    }
+
+    public WritableSortable(int j) throws IOException {
+      Text t = new Text();
+      StringBuffer sb = new StringBuffer();
+      indices = new int[j];
+      offsets = new int[j];
+      check = new String[j];
+      DataOutputBuffer dob = new DataOutputBuffer();
+      for (int i = 0; i < j; ++i) {
+        indices[i] = i;
+        offsets[i] = dob.getLength();
+        genRandom(t, r.nextInt(15) + 1, sb);
+        t.write(dob);
+        check[i] = t.toString();
+      }
+      eob = dob.getLength();
+      bytes = dob.getData();
+      comparator = WritableComparator.get(Text.class);
+    }
+
+    private static void genRandom(Text t, int len, StringBuffer sb) {
+      sb.setLength(0);
+      for (int i = 0; i < len; ++i) {
+        sb.append(Integer.toString(r.nextInt(26) + 10, 36));
+      }
+      t.set(sb.toString());
+    }
+
+    public int compare(int i, int j) {
+      final int ii = indices[i];
+      final int ij = indices[j];
+      return comparator.compare(bytes, offsets[ii],
+        ((ii + 1 == indices.length) ? eob : offsets[ii + 1]) - offsets[ii],
+        bytes, offsets[ij],
+        ((ij + 1 == indices.length) ? eob : offsets[ij + 1]) - offsets[ij]);
+    }
+
+    public void swap(int i, int j) {
+      int tmp = indices[i];
+      indices[i] = indices[j];
+      indices[j] = tmp;
+    }
+
+    public String[] getValues() {
+      return check;
+    }
+
+    public String[] getSorted() throws IOException {
+      String[] ret = new String[indices.length];
+      Text t = new Text();
+      DataInputBuffer dib = new DataInputBuffer();
+      for (int i = 0; i < ret.length; ++i) {
+        int ii = indices[i];
+        dib.reset(bytes, offsets[ii],
+        ((ii + 1 == indices.length) ? eob : offsets[ii + 1]) - offsets[ii]);
+        t.readFields(dib);
+        ret[i] = t.toString();
+      }
+      return ret;
+    }
+
+  }
+
+  public void testAllEqual() throws Exception {
+    final int SAMPLE = 50;
+    int[] values = new int[SAMPLE];
+    Arrays.fill(values, 10);
+    SampleSortable s = new SampleSortable(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testSorted() throws Exception {
+    final int SAMPLE = 50;
+    int[] values = new int[SAMPLE];
+    Random r = new Random();
+    for (int i = 0; i < SAMPLE; ++i) {
+      values[i] = r.nextInt(100);
+    }
+    Arrays.sort(values);
+    SampleSortable s = new SampleSortable(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testSingleRecord() throws Exception {
+    final int SAMPLE = 1;
+    SampleSortable s = new SampleSortable(SAMPLE);
+    int[] values = s.getValues();
+    Arrays.sort(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testQuickSort() throws Exception {
+    final int SAMPLE = 10000;
+    SampleSortable s = new SampleSortable(SAMPLE);
+    int[] values = s.getValues();
+    Arrays.sort(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testWritable() throws Exception {
+    final int SAMPLE = 1000;
+    WritableSortable s = new WritableSortable(SAMPLE);
+    String[] values = s.getValues();
+    Arrays.sort(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    String[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+}



Mime
View raw message