hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r437791 [1/2] - in /lucene/hadoop/trunk: ./ conf/ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/io/ src/test/org/apache/h...
Date Mon, 28 Aug 2006 19:24:55 GMT
Author: cutting
Date: Mon Aug 28 12:24:54 2006
New Revision: 437791

URL: http://svn.apache.org/viewvc?rev=437791&view=rev
Log:
HADOOP-54.  Add block compression to SequenceFile.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestMapRed.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Aug 28 12:24:54 2006
@@ -74,6 +74,12 @@
 
 18. HADOOP-461.  Make Java 1.5 an explicit requirement.  (cutting)
 
+19. HADOOP-54.  Add block compression to SequenceFile.  One may now
+    specify that blocks of keys and values are compressed together,
+    improving compression for small keys and values.
+    SequenceFile.Writer's constructor is now deprecated and replaced
+    with a factory method.  (Arun C Murthy via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Mon Aug 28 12:24:54 2006
@@ -396,6 +396,37 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.seqfile.compress.blocksize</name>
+  <value>1000000</value>
+  <description>The minimum block size for compression in block compressed 
+  				SequenceFiles.
+  </description>
+</property>
+
+<property>
+  <name>mapred.seqfile.lazydecompress</name>
+  <value>true</value>
+  <description>Should values of block-compressed SequenceFiles be decompressed
+  				only when necessary.
+  </description>
+</property>
+
+<property>
+  <name>mapred.seqfile.sorter.recordlimit</name>
+  <value>1000000</value>
+  <description>The limit on number of records to be kept in memory in a spill 
+  				in SequenceFiles.Sorter
+  </description>
+</property>
+
+<property>
+  <name>mapred.seqfile.compression.type</name>
+  <value>NONE</value>
+  <description>The default compression type for SequenceFile.Writer.
+  </description>
+</property>
+
 <!-- ipc properties -->
 
 <property>

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Mon Aug 28 12:24:54 2006
@@ -31,6 +31,7 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -184,9 +185,10 @@
 
     for(int i=0; i < numMaps; ++i) {
       Path file = new Path(inDir, "part"+i);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys,
-                                file,
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys,
+                                jobConf, file,
                                 IntWritable.class, IntWritable.class,
+                                CompressionType.NONE,
                                 null);
       writer.append(new IntWritable(0), new IntWritable(filesPerMap));
       writer.close();

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java Mon Aug 28 12:24:54 2006
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.*;
 
 /**
@@ -125,8 +126,9 @@
         Path outDir = new Path(tmpDir, "out");
         Path outFile = new Path(outDir, "reduce-out");
         FileSystem fileSys = FileSystem.get(conf);
-        SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
-              LongWritable.class, LongWritable.class);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, 
+            outFile, LongWritable.class, LongWritable.class, 
+            CompressionType.NONE);
         writer.append(new LongWritable(numInside), new LongWritable(numOutside));
         writer.close();
       }
@@ -173,8 +175,8 @@
     
     for(int idx=0; idx < numMaps; ++idx) {
       Path file = new Path(inDir, "part"+idx);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
-              LongWritable.class, LongWritable.class);
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, 
+          file, LongWritable.class, LongWritable.class, CompressionType.NONE);
       writer.append(new LongWritable(numPoints), new LongWritable(0));
       writer.close();
       System.out.println("Wrote input for Map #"+idx);

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Mon Aug 28 12:24:54 2006
@@ -31,6 +31,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -51,6 +52,7 @@
   
   public static class Map extends MapReduceBase implements Mapper {
     private FileSystem fileSys = null;
+    private JobConf jobConf = null;
     private long numBytesToWrite;
     private int minKeySize;
     private int keySizeRange;
@@ -75,9 +77,9 @@
                     Reporter reporter) throws IOException {
       String filename = ((Text) value).toString();
       SequenceFile.Writer writer = 
-        new SequenceFile.Writer(fileSys, new Path(filename), 
+        SequenceFile.createWriter(fileSys, jobConf, new Path(filename), 
                                 BytesWritable.class, BytesWritable.class,
-                                reporter);
+                                CompressionType.NONE, reporter);
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         int keyLength = minKeySize + 
@@ -104,6 +106,7 @@
      * the data.
      */
     public void configure(JobConf job) {
+      jobConf = job;
       try {
         fileSys = FileSystem.get(job);
       } catch (IOException e) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Aug 28 12:24:54 2006
@@ -35,8 +35,9 @@
 
   private SequenceFile() {}                         // no public ctor
 
+  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
   private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', 3
+    (byte)'S', (byte)'E', (byte)'Q', BLOCK_COMPRESS_VERSION
   };
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
@@ -46,27 +47,226 @@
   /** The number of bytes between sync points.*/
   public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
 
-  /** Write key/value pairs to a sequence-format file. */
-  public static class Writer {
-    private FSDataOutputStream out;
-    private DataOutputBuffer buffer = new DataOutputBuffer();
-    private Path target = null;
+  /** The type of compression.
+   * @see SequenceFile#Writer
+   */
+  public static enum CompressionType {
+    /** Do not compress records. */
+    NONE, 
+    /** Compress values only, each separately. */
+    RECORD,
+    /** Compress sequences of records together in blocks. */
+    BLOCK
+  }
+  
+  /**
+   * Construct the preferred type of SequenceFile Writer.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param name The name of the file. 
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer 
+  createWriter(FileSystem fs, Configuration conf, Path name, 
+      Class keyClass, Class valClass, CompressionType compressionType) 
+  throws IOException {
+    Writer writer = null;
+    
+    if (compressionType == CompressionType.NONE) {
+      writer = new Writer(fs, conf, name, keyClass, valClass);
+    } else if (compressionType == CompressionType.RECORD) {
+      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass);
+    } else if (compressionType == CompressionType.BLOCK){
+      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass);
+    }
+    
+    return writer;
+  }
+  
+  /**
+   * Construct the preferred type of SequenceFile Writer.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param name The name of the file. 
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param progress The Progressable object to track progress.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(FileSystem fs, Configuration conf, Path name, 
+      Class keyClass, Class valClass, CompressionType compressionType,
+      Progressable progress) throws IOException {
+    Writer writer = null;
+    
+    if (compressionType == CompressionType.NONE) {
+      writer = new Writer(fs, conf, name, keyClass, valClass, progress);
+    } else if (compressionType == CompressionType.RECORD) {
+      writer = new RecordCompressWriter(fs, conf, name, 
+          keyClass, valClass, progress);
+    } else if (compressionType == CompressionType.BLOCK){
+      writer = new BlockCompressWriter(fs, conf, name, 
+          keyClass, valClass, progress);
+    }
+    
+    return writer;
+  }
 
-    private Class keyClass;
-    private Class valClass;
+  /**
+   * Construct the preferred type of 'raw' SequenceFile Writer.
+   * @param out The stream on top which the writer is to be constructed.
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compress Compress data?
+   * @param blockCompress Compress blocks?
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  private static Writer
+  createWriter(FSDataOutputStream out, 
+      Class keyClass, Class valClass, boolean compress, boolean blockCompress)
+  throws IOException {
+    Writer writer = null;
+
+    if (!compress) {
+      writer = new Writer(out, keyClass, valClass);
+    } else if (compress && !blockCompress) {
+      writer = new RecordCompressWriter(out, keyClass, valClass);
+    } else {
+      writer = new BlockCompressWriter(out, keyClass, valClass);
+    }
+    
+    return writer;
+  }
 
-    private boolean deflateValues;
-    private Deflater deflater = new Deflater(Deflater.BEST_SPEED);
-    private DeflaterOutputStream deflateFilter =
+  /** The interface to 'raw' values of SequenceFiles. */
+  public static interface ValueBytes {
+
+    /** Writes the uncompressed bytes to the outStream.
+     * @param outStream : Stream to write uncompressed bytes into.
+     * @throws IOException
+     */
+    public void writeUncompressedBytes(DataOutputStream outStream)
+    throws IOException;
+
+    /** Write compressed bytes to outStream. 
+     * Note: that it will NOT compress the bytes if they are not compressed.
+     * @param outStream : Stream to write compressed bytes into.
+     */
+    public void writeCompressedBytes(DataOutputStream outStream) 
+    throws IllegalArgumentException, IOException;
+  }
+  
+  private static class UncompressedBytes implements ValueBytes {
+    private int dataSize;
+    private byte[] data;
+    
+    private UncompressedBytes() {
+      data = null;
+      dataSize = 0;
+    }
+    
+    private void reset(DataInputStream in, int length) throws IOException {
+      data = new byte[length];
+      dataSize = -1;
+      
+      in.readFully(data);
+      dataSize = data.length;
+    }
+    
+    public int getSize() {
+      return dataSize;
+    }
+    
+    public void writeUncompressedBytes(DataOutputStream outStream)
+    throws IOException {
+      outStream.write(data, 0, dataSize);
+    }
+
+    public void writeCompressedBytes(DataOutputStream outStream) 
+    throws IllegalArgumentException, IOException {
+      throw 
+      new IllegalArgumentException("UncompressedBytes cannot be compressed!");
+    }
+
+  } // UncompressedBytes
+  
+  private static class CompressedBytes implements ValueBytes {
+    private int dataSize;
+    private byte[] data;
+    private Inflater zlibInflater = null;
+
+    private CompressedBytes() {
+      data = null;
+      dataSize = 0;
+    }
+
+    private void reset(DataInputStream in, int length) throws IOException {
+      data = new byte[length];
+      dataSize = -1;
+
+      in.readFully(data);
+      dataSize = data.length;
+    }
+    
+    public int getSize() {
+      return dataSize;
+    }
+    
+    public void writeUncompressedBytes(DataOutputStream outStream)
+    throws IOException {
+      if (zlibInflater == null) {
+        zlibInflater = new Inflater();
+      } else {
+        zlibInflater.reset();
+      }
+      zlibInflater.setInput(data, 0, dataSize);
+
+      byte[] buffer = new byte[8192];
+      while (!zlibInflater.finished()) {
+        try {
+          int noDecompressedBytes = zlibInflater.inflate(buffer);
+          outStream.write(buffer, 0, noDecompressedBytes);
+        } catch (DataFormatException e) {
+          throw new IOException (e.toString());
+        }
+      }
+    }
+
+    public void writeCompressedBytes(DataOutputStream outStream) 
+    throws IllegalArgumentException, IOException {
+      outStream.write(data, 0, dataSize);
+    }
+
+  } // CompressedBytes
+  
+  /** Write key/value pairs to a sequence-format file. */
+  public static class Writer {
+    FSDataOutputStream out;
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    Path target = null;
+
+    Class keyClass;
+    Class valClass;
+
+    private boolean compress;
+    Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+    DeflaterOutputStream deflateFilter =
       new DeflaterOutputStream(buffer, deflater);
-    private DataOutputStream deflateOut =
+    DataOutputStream deflateOut =
       new DataOutputStream(new BufferedOutputStream(deflateFilter));
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
-    private long lastSyncPos;                     // position of last sync
-    private byte[] sync;                          // 16 random bytes
+    long lastSyncPos;                     // position of last sync
+    byte[] sync;                          // 16 random bytes
     {
       try {                                       // use hash of uid + host
         MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -77,19 +277,25 @@
       }
     }
 
-    /** @deprecated Call {@link #Writer(FileSystem,Path,Class,Class)}. */
+    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Path,Class,Class)}. */
     public Writer(FileSystem fs, String name, Class keyClass, Class valClass)
       throws IOException {
       this(fs, new Path(name), keyClass, valClass, false);
     }
 
+    /** Implicit constructor: needed for the period of transition!*/
+    Writer()
+    {}
+    
     /** Create the named file. */
+    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class)}. */
     public Writer(FileSystem fs, Path name, Class keyClass, Class valClass)
       throws IOException {
       this(fs, name, keyClass, valClass, false);
     }
     
     /** Create the named file with write-progress reporter. */
+    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class,Progressable)}. */
     public Writer(FileSystem fs, Path name, Class keyClass, Class valClass,
             Progressable progress)
       throws IOException {
@@ -99,61 +305,100 @@
     /** Create the named file.
      * @param compress if true, values are compressed.
      */
+    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class)}. */
     public Writer(FileSystem fs, Path name,
                   Class keyClass, Class valClass, boolean compress)
       throws IOException {
-      this.target = name;
-      init(fs.create(target), keyClass, valClass, compress);
+      init(name, fs.create(name), keyClass, valClass, compress);
+
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
     }
     
     /** Create the named file with write-progress reporter.
      * @param compress if true, values are compressed.
      */
+    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class,Progressable)}. */
     public Writer(FileSystem fs, Path name,
                   Class keyClass, Class valClass, boolean compress,
                   Progressable progress)
       throws IOException {
-      this.target = name;
-      init(fs.create(target, progress), keyClass, valClass, compress);
+      init(name, fs.create(name, progress), keyClass, valClass, compress);
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
     }
     
+    /** Create the named file. */
+    public Writer(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass)
+      throws IOException {
+      this(fs, name, keyClass, valClass, false);
+    }
+    
+    /** Create the named file with write-progress reporter. */
+    public Writer(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, Progressable progress)
+      throws IOException {
+      this(fs, name, keyClass, valClass, false, progress);
+    }
+
     /** Write to an arbitrary stream using a specified buffer size. */
-    private Writer(FSDataOutputStream out,
-                   Class keyClass, Class valClass, boolean compress)
+    private Writer(FSDataOutputStream out, Class keyClass, Class valClass)
       throws IOException {
-      init(out, keyClass, valClass, compress);
+      init(null, out, keyClass, valClass, false);
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+    }
+
+    /** Write the initial part of file header. */
+    void initializeFileHeader() 
+    throws IOException{
+      out.write(VERSION);
+    }
+
+    /** Write the final part of file header. */
+    void finalizeFileHeader() 
+    throws IOException{
+      out.write(sync);                       // write the sync bytes
+      out.flush();                           // flush header
     }
     
+    boolean isCompressed() { return compress; }
+    boolean isBlockCompressed() { return false; }
+    
     /** Write and flush the file header. */
-    private void init(FSDataOutputStream out,
+    void writeFileHeader() 
+    throws IOException {
+      Text.writeString(out, keyClass.getName());
+      Text.writeString(out, valClass.getName());
+      
+      out.writeBoolean(this.isCompressed());
+      out.writeBoolean(this.isBlockCompressed());
+    }
+
+    /** Initialize. */
+    void init(Path name, FSDataOutputStream out,
                       Class keyClass, Class valClass,
-                      boolean compress) throws IOException {
+                      boolean compress) 
+    throws IOException {
+      this.target = name;
       this.out = out;
-      this.out.write(VERSION);
-
       this.keyClass = keyClass;
       this.valClass = valClass;
-
-      this.deflateValues = compress;
-
-      new UTF8(WritableName.getName(keyClass)).write(this.out);
-      new UTF8(WritableName.getName(valClass)).write(this.out);
-
-      this.out.writeBoolean(deflateValues);
-
-      out.write(sync);                            // write the sync bytes
-
-      this.out.flush();                           // flush header
+      this.compress = compress;
     }
     
-
     /** Returns the class of keys in this file. */
     public Class getKeyClass() { return keyClass; }
 
     /** Returns the class of values in this file. */
     public Class getValueClass() { return valClass; }
 
-
     /** Close the file. */
     public synchronized void close() throws IOException {
       if (out != null) {
@@ -162,6 +407,16 @@
       }
     }
 
+    synchronized void checkAndWriteSync() throws IOException {
+      if (sync != null &&
+          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+        lastSyncPos = out.getPos();               // update lastSyncPos
+        //LOG.info("sync@"+lastSyncPos);
+        out.writeInt(SYNC_ESCAPE);                // escape it
+        out.write(sync);                          // write sync
+      }
+    }
+
     /** Append a key/value pair. */
     public synchronized void append(Writable key, Writable val)
       throws IOException {
@@ -172,12 +427,14 @@
 
       buffer.reset();
 
+      // Append the 'key'
       key.write(buffer);
       int keyLength = buffer.getLength();
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
 
-      if (deflateValues) {
+      // Append the 'value'
+      if (compress) {
         deflater.reset();
         val.write(deflateOut);
         deflateOut.flush();
@@ -186,44 +443,330 @@
         val.write(buffer);
       }
 
-      append(buffer.getData(), 0, buffer.getLength(), keyLength);
+      // Write the record out
+      checkAndWriteSync();                                // sync
+      out.writeInt(buffer.getLength());                   // total record length
+      out.writeInt(keyLength);                            // key portion length
+      out.write(buffer.getData(), 0, buffer.getLength()); // data
     }
 
-    /** Append a key/value pair. */
+    /** 
+     * Append a key/value pair. 
+     * @deprecated Call {@link #appendRaw(byte[], int, int, SequenceFile.ValueBytes)}. 
+     */
     public synchronized void append(byte[] data, int start, int length,
                                     int keyLength) throws IOException {
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      if (sync != null &&
-          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
-        lastSyncPos = out.getPos();               // update lastSyncPos
-        //LOG.info("sync@"+lastSyncPos);
-        out.writeInt(SYNC_ESCAPE);                // escape it
-        out.write(sync);                          // write sync
-      }
-
+      checkAndWriteSync();                        // sync
       out.writeInt(length);                       // total record length
       out.writeInt(keyLength);                    // key portion length
       out.write(data, start, length);             // data
 
     }
+    
+    public synchronized void appendRaw(
+        byte[] keyData, int keyOffset, int keyLength, ValueBytes val) 
+    throws IOException {
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed: " + keyLength);
+
+      UncompressedBytes value = (UncompressedBytes)val;
+      int valLength = value.getSize();
+
+      checkAndWriteSync();
+      
+      out.writeInt(keyLength+valLength);          // total record length
+      out.writeInt(keyLength);                    // key portion length
+      out.write(keyData, keyOffset, keyLength);   // key
+      val.writeUncompressedBytes(out);            // value
+    }
 
     /** Returns the current length of the output file. */
     public synchronized long getLength() throws IOException {
       return out.getPos();
     }
 
-  }
+  } // class Writer
+
+  /** Write key/compressed-value pairs to a sequence-format file. */
+  static class RecordCompressWriter extends Writer {
+    
+    /** Create the named file. */
+    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass) throws IOException {
+      super.init(name, fs.create(name), keyClass, valClass, true);
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+    }
+    
+    /** Create the named file with write-progress reporter. */
+    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, Progressable progress)
+    throws IOException {
+      super.init(name, fs.create(name, progress), keyClass, valClass, true);
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+    }
+    
+    /** Write to an arbitrary stream using a specified buffer size. */
+    private RecordCompressWriter(FSDataOutputStream out,
+                   Class keyClass, Class valClass)
+      throws IOException {
+      super.init(null, out, keyClass, valClass, true);
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+      
+    }
+
+    boolean isCompressed() { return true; }
+    boolean isBlockCompressed() { return false; }
+
+    /** Append a key/value pair. */
+    public synchronized void append(Writable key, Writable val)
+      throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+key+" is not "+keyClass);
+      if (val.getClass() != valClass)
+        throw new IOException("wrong value class: "+val+" is not "+valClass);
+
+      buffer.reset();
+
+      // Append the 'key'
+      key.write(buffer);
+      int keyLength = buffer.getLength();
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed: " + key);
+
+      // Compress 'value' and append it
+      deflater.reset();
+      val.write(deflateOut);
+      deflateOut.flush();
+      deflateFilter.finish();
+
+      // Write the record out
+      checkAndWriteSync();                                // sync
+      out.writeInt(buffer.getLength());                   // total record length
+      out.writeInt(keyLength);                            // key portion length
+      out.write(buffer.getData(), 0, buffer.getLength()); // data
+    }
+
+    /** Append a key/value pair. */
+    public synchronized void appendRaw(
+        byte[] keyData, int keyOffset, int keyLength,
+        ValueBytes val
+        ) throws IOException {
+
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed");
+
+      CompressedBytes value = (CompressedBytes)val;
+      int valLength = value.getSize();
+      
+      checkAndWriteSync();                        // sync
+      out.writeInt(keyLength+valLength);          // total record length
+      out.writeInt(keyLength);                    // key portion length
+      out.write(keyData, keyOffset, keyLength);   // 'key' data
+      val.writeCompressedBytes(out);              // 'value' data
+    }
+    
+  } // RecordCompressionWriter
+
+  /** Write compressed key/value blocks to a sequence-format file. */
+  static class BlockCompressWriter extends Writer {
+    
+    private int noBufferedRecords = 0;
+    
+    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
+    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
+
+    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
+    private DataOutputBuffer valBuffer = new DataOutputBuffer();
+
+    private DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+    private Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+    private DeflaterOutputStream deflateFilter = 
+      new DeflaterOutputStream(compressedDataBuffer, deflater);
+    private DataOutputStream deflateOut = 
+      new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
+    private int compressionBlockSize;
+    
+    /** Create the named file. */
+    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass) throws IOException {
+      super.init(name, fs.create(name), keyClass, valClass, true);
+      init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+    }
+    
+    /** Create the named file with write-progress reporter. */
+    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, Progressable progress)
+    throws IOException {
+      super.init(name, fs.create(name, progress), keyClass, valClass, true);
+      init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+    }
+    
+    /** Write to an arbitrary stream using a specified buffer size. */
+    private BlockCompressWriter(FSDataOutputStream out,
+                   Class keyClass, Class valClass)
+      throws IOException {
+      super.init(null, out, keyClass, valClass, true);
+      init(1000000);
+      
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+    }
+
+    boolean isCompressed() { return true; }
+    boolean isBlockCompressed() { return true; }
+
+    /** Initialize */
+    void init(int compressionBlockSize) {
+      this.compressionBlockSize = compressionBlockSize;
+    }
+    
+    /** Workhorse to check and write out compressed data/lengths */
+    private synchronized 
+    void writeBuffer(DataOutputBuffer buffer) 
+    throws IOException {
+      deflater.reset();
+      compressedDataBuffer.reset();
+      deflateOut.write(buffer.getData(), 0, buffer.getLength());
+      deflateOut.flush();
+      deflateFilter.finish();
+      
+      WritableUtils.writeVInt(out, compressedDataBuffer.getLength());
+      out.write(compressedDataBuffer.getData(), 
+          0, compressedDataBuffer.getLength());
+    }
+    
+    /** Compress and flush contents to dfs */
+    private synchronized void writeBlock() throws IOException {
+      if (noBufferedRecords > 0) {
+        // Write 'sync' marker
+        if (sync != null) {
+          out.writeInt(SYNC_ESCAPE);
+          out.write(sync);
+        }
+        
+        // No. of records
+        WritableUtils.writeVInt(out, noBufferedRecords);
+        
+        // Write 'keys' and lengths
+        writeBuffer(keyLenBuffer);
+        writeBuffer(keyBuffer);
+        
+        // Write 'values' and lengths
+        writeBuffer(valLenBuffer);
+        writeBuffer(valBuffer);
+        
+        // Flush the file-stream
+        out.flush();
+        
+        // Reset internal states
+        keyLenBuffer.reset();
+        keyBuffer.reset();
+        valLenBuffer.reset();
+        valBuffer.reset();
+        noBufferedRecords = 0;
+      }
+      
+    }
+    
+    /** Close the file. */
+    public synchronized void close() throws IOException {
+      if (out != null) {
+        writeBlock();
+        out.close();
+        out = null;
+      }
+    }
+
+    /** Append a key/value pair. */
+    public synchronized void append(Writable key, Writable val)
+      throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+key+" is not "+keyClass);
+      if (val.getClass() != valClass)
+        throw new IOException("wrong value class: "+val+" is not "+valClass);
+
+      // Save key/value into respective buffers 
+      int oldKeyLength = keyBuffer.getLength();
+      key.write(keyBuffer);
+      int keyLength = keyBuffer.getLength() - oldKeyLength;
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed: " + key);
+      WritableUtils.writeVInt(keyLenBuffer, keyLength);
+
+      int oldValLength = valBuffer.getLength();
+      val.write(valBuffer);
+      int valLength = valBuffer.getLength() - oldValLength;
+      WritableUtils.writeVInt(valLenBuffer, valLength);
+      
+      // Added another key/value pair
+      ++noBufferedRecords;
+      
+      // Compress and flush?
+      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
+      if (currentBlockSize >= compressionBlockSize) {
+        writeBlock();
+      }
+    }
+    
+    /** Append a key/value pair. */
+    public synchronized void appendRaw(
+        byte[] keyData, int keyOffset, int keyLength,
+        ValueBytes val
+        ) throws IOException {
+      
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed");
 
-  /** Writes key/value pairs from a sequence-format file. */
+      UncompressedBytes value = (UncompressedBytes)val;
+      int valLength = value.getSize();
+      
+      // Save key/value data in relevant buffers
+      WritableUtils.writeVInt(keyLenBuffer, keyLength);
+      keyBuffer.write(keyData, keyOffset, keyLength);
+      WritableUtils.writeVInt(valLenBuffer, valLength);
+      val.writeUncompressedBytes(valBuffer);
+
+      // Added another key/value pair
+      ++noBufferedRecords;
+
+      // Compress and flush?
+      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
+      if (currentBlockSize >= compressionBlockSize) {
+        writeBlock();
+      }
+    }
+  
+  } // BlockCompressionWriter
+  
+  /** Reads key/value pairs from a sequence-format file. */
   public static class Reader {
     private Path file;
     private FSDataInputStream in;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
-    private DataInputBuffer inBuf = new DataInputBuffer();
 
-    private byte[] version = new byte[VERSION.length];
+    private byte version;
 
     private Class keyClass;
     private Class valClass;
@@ -235,15 +778,33 @@
     private long end;
     private int keyLength;
 
-    private boolean inflateValues;
-    private Inflater inflater = new Inflater();
-    private InflaterInputStream inflateFilter =
-        new InflaterInputStream(inBuf, inflater);
-    private DataInputStream inflateIn =
-        new DataInputStream(new BufferedInputStream(inflateFilter));
+    private boolean decompress;
+    private boolean blockCompressed;
+    
     private Configuration conf;
 
-    /** @deprecated Call {@link #Reader(FileSystem,Path,Configuration)}.*/
+    private int noBufferedRecords = 0;
+    private boolean lazyDecompress = true;
+    private boolean valuesDecompressed = true;
+    
+    private int noBufferedKeys = 0;
+    private int noBufferedValues = 0;
+    
+    private DataInputBuffer keyLenBuffer = null;
+    private Inflater keyLenInflater = null;
+    private DataInputStream keyLenIn = null;
+    private DataInputBuffer keyBuffer = null;
+    private Inflater keyInflater = null;
+    private DataInputStream keyIn = null;
+
+    private DataInputBuffer valLenBuffer = null;
+    private Inflater valLenInflater = null;
+    private DataInputStream valLenIn = null;
+    private DataInputBuffer valBuffer = null;
+    private Inflater valInflater = null;
+    private DataInputStream valIn = null;
+
+    /** @deprecated Call {@link #SequenceFile.Reader(FileSystem,Path,Configuration)}.*/
     public Reader(FileSystem fs, String file, Configuration conf)
       throws IOException {
       this(fs, new Path(file), conf);
@@ -270,44 +831,87 @@
       this.in = fs.open(file, bufferSize);
       this.conf = conf;
       seek(start);
-      init();
-
       this.end = in.getPos() + length;
+      init();
     }
     
     private void init() throws IOException {
-      in.readFully(version);
+      byte[] versionBlock = new byte[VERSION.length];
+      in.readFully(versionBlock);
 
-      if ((version[0] != VERSION[0]) ||
-          (version[1] != VERSION[1]) ||
-          (version[2] != VERSION[2]))
+      if ((versionBlock[0] != VERSION[0]) ||
+          (versionBlock[1] != VERSION[1]) ||
+          (versionBlock[2] != VERSION[2]))
         throw new IOException(file + " not a SequenceFile");
 
-      if (version[3] > VERSION[3])
-        throw new VersionMismatchException(VERSION[3], version[3]);
+      // Set 'version'
+      version = versionBlock[3];
+      if (version > VERSION[3])
+        throw new VersionMismatchException(VERSION[3], version);
 
-      UTF8 className = new UTF8();
-      
-      className.readFields(in);                   // read key class name
-      this.keyClass = WritableName.getClass(className.toString());
-      
-      className.readFields(in);                   // read val class name
-      this.valClass = WritableName.getClass(className.toString());
+      if (version < BLOCK_COMPRESS_VERSION) {
+        UTF8 className = new UTF8();
+        
+        className.readFields(in);                   // read key class name
+        this.keyClass = WritableName.getClass(className.toString());
+        
+        className.readFields(in);                   // read val class name
+        this.valClass = WritableName.getClass(className.toString());
+      } else {
+        this.keyClass = WritableName.getClass(Text.readString(in));
+        this.valClass = WritableName.getClass(Text.readString(in));
+      }
 
-      if (version[3] > 2) {                       // if version > 2
-        this.inflateValues = in.readBoolean();    // is compressed?
+      if (version > 2) {                          // if version > 2
+        this.decompress = in.readBoolean();       // is compressed?
       }
 
-      if (version[3] > 1) {                       // if version > 1
+      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
+        this.blockCompressed = in.readBoolean();  // is block-compressed?
+      }
+      
+      if (version > 1) {                          // if version > 1
         in.readFully(sync);                       // read sync bytes
       }
+      
+      // Initialize
+      valBuffer = new DataInputBuffer();
+      if (decompress) {
+        valInflater = new Inflater();
+        valIn = new DataInputStream(new BufferedInputStream(
+            new InflaterInputStream(valBuffer, valInflater))
+        );
+      } else {
+        valIn = new DataInputStream(new BufferedInputStream(valBuffer));
+      }
+      
+      if (blockCompressed) {
+        keyLenBuffer = new DataInputBuffer();
+        keyBuffer = new DataInputBuffer();
+        valLenBuffer = new DataInputBuffer();
+        
+        keyLenInflater = new Inflater();
+        keyLenIn = new DataInputStream(new BufferedInputStream(
+            new InflaterInputStream(keyLenBuffer, keyLenInflater))
+        );
+        
+        keyInflater = new Inflater();
+        keyIn = new DataInputStream(new BufferedInputStream(
+            new InflaterInputStream(keyBuffer, keyInflater)));
+        
+        valLenInflater = new Inflater();
+        valLenIn = new DataInputStream(new BufferedInputStream(
+            new InflaterInputStream(valLenBuffer, valLenInflater))
+        );
+      }
+      
+
+      lazyDecompress = conf.getBoolean("mapred.seqfile.lazydecompress", true);
     }
     
     /** Close the file. */
     public synchronized void close() throws IOException {
       in.close();
-      inflateIn.close();
-      inflater.end();
     }
 
     /** Returns the class of keys in this file. */
@@ -317,49 +921,189 @@
     public Class getValueClass() { return valClass; }
 
     /** Returns true if values are compressed. */
-    public boolean isCompressed() { return inflateValues; }
+    public boolean isCompressed() { return decompress; }
+    
+    /** Returns true if records are block-compressed. */
+    public boolean isBlockCompressed() { return blockCompressed; }
+    
+    /** Read a compressed buffer */
+    private synchronized void readBuffer(
+        DataInputBuffer buffer, Inflater inflater, boolean castAway
+        ) throws IOException {
+      // Read data into a temporary buffer
+      DataOutputBuffer dataBuffer = new DataOutputBuffer();
+      int dataBufferLength = WritableUtils.readVInt(in);
+      dataBuffer.write(in, dataBufferLength);
+      
+      if (false == castAway) {
+        // Reset the inflater
+        inflater.reset();
+        
+        // Set up 'buffer' connected to the input-stream
+        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+      }
+    }
+    
+    /** Read the next 'compressed' block */
+    private synchronized void readBlock() throws IOException {
+      // Check if we need to throw away a whole block of 
+      // 'values' due to 'lazy decompression' 
+      if (lazyDecompress && !valuesDecompressed) {
+        readBuffer(null, null, true);
+        readBuffer(null, null, true);
+      }
+      
+      // Reset internal states
+      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
+      valuesDecompressed = false;
+
+      //Process sync
+      if (sync != null) {
+        in.readInt();
+        in.readFully(syncCheck);                // read syncCheck
+        if (!Arrays.equals(sync, syncCheck))    // check it
+          throw new IOException("File is corrupt!");
+      }
+      syncSeen = true;
 
+      // Read number of records in this block
+      noBufferedRecords = WritableUtils.readVInt(in);
+      
+      // Read key lengths and keys
+      readBuffer(keyLenBuffer, keyLenInflater, false);
+      readBuffer(keyBuffer, keyInflater, false);
+      noBufferedKeys = noBufferedRecords;
+      
+      // Read value lengths and values
+      if (!lazyDecompress) {
+        readBuffer(valLenBuffer, valLenInflater, false);
+        readBuffer(valBuffer, valInflater, false);
+        noBufferedValues = noBufferedRecords;
+        valuesDecompressed = true;
+      }
+    }
+
+    /** 
+     * Position valLenIn/valIn to the 'value' 
+     * corresponding to the 'current' key 
+     */
+    private synchronized void seekToCurrentValue() throws IOException {
+      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+        if (decompress) {
+          valInflater.reset();
+        }
+      } else {
+        // Check if this is the first value in the 'block' to be read
+        if (lazyDecompress && !valuesDecompressed) {
+          // Read the value lengths and values
+          readBuffer(valLenBuffer, valLenInflater, false);
+          readBuffer(valBuffer, valInflater, false);
+          noBufferedValues = noBufferedRecords;
+          valuesDecompressed = true;
+        }
+        
+        // Calculate the no. of bytes to skip
+        // Note: 'current' key has already been read!
+        int skipValBytes = 0;
+        int currentKey = noBufferedKeys + 1;          
+        for (int i=noBufferedValues; i > currentKey; --i) {
+          skipValBytes += WritableUtils.readVInt(valLenIn);
+          --noBufferedValues;
+        }
+        
+        // Skip to the 'val' corresponding to 'current' key
+        if (skipValBytes > 0) {
+          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
+            throw new IOException("Failed to seek to " + currentKey + 
+                "(th) value!");
+          }
+        }
+      }
+    }
+
+    /**
+     * Get the 'value' corresponding to the last read 'key'.
+     * @param val : The 'value' to be read.
+     * @throws IOException
+     */
+    public synchronized void getCurrentValue(Writable val) 
+    throws IOException {
+      if (val instanceof Configurable) {
+        ((Configurable) val).setConf(this.conf);
+      }
+
+      // Position stream to 'current' value
+      seekToCurrentValue();
+
+      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+        val.readFields(valIn);
+        
+        if (valBuffer.getPosition() != valBuffer.getLength())
+          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+              + " bytes, should read " +
+              (valBuffer.getLength()-keyLength));
+      } else {
+        // Get the value
+        int valLength = WritableUtils.readVInt(valLenIn);
+        val.readFields(valIn);
+        
+        // Read another compressed 'value'
+        --noBufferedValues;
+        
+        // Sanity check
+        if (valLength < 0) {
+          LOG.debug(val + " is a zero-length value");
+        }
+      }
+
+    }
+    
     /** Read the next key in the file into <code>key</code>, skipping its
      * value.  True if another entry exists, and false at end of file. */
     public synchronized boolean next(Writable key) throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key+" is not "+keyClass);
 
-      outBuf.reset();
-
-      keyLength = next(outBuf);
-      if (keyLength < 0)
-        return false;
-
-      inBuf.reset(outBuf.getData(), outBuf.getLength());
-
-      key.readFields(inBuf);
-      if (inBuf.getPosition() != keyLength)
-        throw new IOException(key + " read " + inBuf.getPosition()
-                              + " bytes, should read " + keyLength);
+      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+        outBuf.reset();
+        
+        keyLength = next(outBuf);
+        if (keyLength < 0)
+          return false;
+        
+        valBuffer.reset(outBuf.getData(), outBuf.getLength());
+        
+        key.readFields(valBuffer);
+        if (valBuffer.getPosition() != keyLength)
+          throw new IOException(key + " read " + valBuffer.getPosition()
+              + " bytes, should read " + keyLength);
+      } else {
+        //Reset syncSeen
+        syncSeen = false;
+        
+        if (noBufferedKeys == 0) {
+          try {
+            readBlock();
+          } catch (EOFException eof) {
+            return false;
+          }
+        }
+        
+        int keyLength = WritableUtils.readVInt(keyLenIn);
+        
+        // Sanity check
+        if (keyLength < 0) {
+          return false;
+        }
+        
+        //Read another compressed 'key'
+        key.readFields(keyIn);
+        --noBufferedKeys;
+      }
 
       return true;
     }
 
-    /** Read the current value in the buffer into <code>val</code>. */
-    public synchronized void getCurrentValue(Writable val)
-        throws IOException {
-        if(val instanceof Configurable) {
-            ((Configurable) val).setConf(this.conf);
-        }
-        if (inflateValues) {
-            inflater.reset();
-            val.readFields(inflateIn);
-        }  else {
-            val.readFields(inBuf);
-            if (inBuf.getPosition() != inBuf.getLength()) {
-                throw new IOException("value: read "+(inBuf.getPosition()-keyLength)
-                                      + " bytes, should read " +
-                                     (inBuf.getLength()-keyLength));
-            }
-        }        
-    }
-    
     /** Read the next key/value pair in the file into <code>key</code> and
      * <code>val</code>.  Returns true if such a pair exists and false when at
      * end of file */
@@ -369,46 +1113,125 @@
         throw new IOException("wrong value class: "+val+" is not "+valClass);
 
       boolean more = next(key);
-
+      
       if (more) {
-          getCurrentValue(val);
+        getCurrentValue(val);
       }
 
       return more;
     }
     
+    private synchronized int checkAndReadSync(int length) 
+    throws IOException {
+      if (version > 1 && sync != null &&
+          length == SYNC_ESCAPE) {              // process a sync entry
+        //LOG.info("sync@"+in.getPos());
+        in.readFully(syncCheck);                // read syncCheck
+        if (!Arrays.equals(sync, syncCheck))    // check it
+          throw new IOException("File is corrupt!");
+        syncSeen = true;
+        length = in.readInt();                  // re-read length
+      } else {
+        syncSeen = false;
+      }
+      
+      return length;
+    }
+    
     /** Read the next key/value pair in the file into <code>buffer</code>.
      * Returns the length of the key read, or -1 if at end of file.  The length
      * of the value may be computed by calling buffer.getLength() before and
      * after calls to this method. */
+    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
     public synchronized int next(DataOutputBuffer buffer) throws IOException {
+      // Unsupported for block-compressed sequence files
+      if (version >= BLOCK_COMPRESS_VERSION && blockCompressed) {
+        throw new IOException("Unsupported call for block-compressed" +
+            " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
+      }
       if (in.getPos() >= end)
         return -1;
 
       try {
-        int length = in.readInt();
-
-        if (version[3] > 1 && sync != null &&
-            length == SYNC_ESCAPE) {              // process a sync entry
-          //LOG.info("sync@"+in.getPos());
-          in.readFully(syncCheck);                // read syncCheck
-          if (!Arrays.equals(sync, syncCheck))    // check it
-            throw new IOException("File is corrupt!");
-          syncSeen = true;
-          length = in.readInt();                  // re-read length
-        } else {
-          syncSeen = false;
-        }
-        
+        int length = checkAndReadSync(in.readInt());
         int keyLength = in.readInt();
         buffer.write(in, length);
         return keyLength;
-
       } catch (ChecksumException e) {             // checksum failure
         handleChecksumException(e);
         return next(buffer);
       }
     }
+    
+    public ValueBytes createValueBytes() {
+      ValueBytes val = null;
+      if (!decompress || blockCompressed) {
+        val = new UncompressedBytes();
+      } else {
+        val = new CompressedBytes();
+      }
+      return val;
+    }
+
+    /**
+     * Read 'raw' records.
+     * @param key - The buffer into which the key is read
+     * @param val - The 'raw' value
+     * @return Returns the total record length
+     * @throws IOException
+     */
+    public int nextRaw(DataOutputBuffer key, ValueBytes val) 
+    throws IOException {
+      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+        if (in.getPos() >= end) 
+          return -1;
+
+        int length = checkAndReadSync(in.readInt());
+        int keyLength = in.readInt();
+        int valLength = length - keyLength;
+        key.write(in, keyLength);
+        if (decompress) {
+          CompressedBytes value = (CompressedBytes)val;
+          value.reset(in, valLength);
+        } else {
+          UncompressedBytes value = (UncompressedBytes)val;
+          value.reset(in, valLength);
+        }
+        
+        return length;
+      } else {
+        //Reset syncSeen
+        syncSeen = false;
+        
+        // Read 'key'
+        if (noBufferedKeys == 0) {
+          if (in.getPos() >= end) 
+            return -1;
+
+          try { 
+            readBlock();
+          } catch (EOFException eof) {
+            return -1;
+          }
+        }
+        int keyLength = WritableUtils.readVInt(keyLenIn);
+        if (keyLength < 0) {
+          throw new IOException("zero length key found!");
+        }
+        key.write(keyIn, keyLength);
+        --noBufferedKeys;
+        
+        // Read raw 'value'
+        seekToCurrentValue();
+        int valLength = WritableUtils.readVInt(valLenIn);
+        UncompressedBytes rawValue = (UncompressedBytes)val;
+        rawValue.reset(valIn, valLength);
+        --noBufferedValues;
+        
+        return (keyLength+valLength);
+      }
+      
+    }
 
     private void handleChecksumException(ChecksumException e)
       throws IOException {
@@ -498,7 +1321,8 @@
     }
 
     /** Sort and merge using an arbitrary {@link WritableComparator}. */
-    public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, Configuration conf) {
+    public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, 
+        Configuration conf) {
       this.fs = fs;
       this.comparator = comparator;
       this.keyClass = comparator.getKeyClass();
@@ -541,6 +1365,12 @@
         segments = mergePass(pass, segments <= factor);
         pass++;
       }
+      
+      // Clean up intermediate files
+      for (int i=0; i < pass; ++i) {
+        fs.delete(new Path(outFile.toString() + "." + i));
+        fs.delete(new Path(outFile.toString() + "." + i + ".index"));
+      }
     }
 
     /**
@@ -563,37 +1393,60 @@
     }
 
     private class SortPass {
-      private int limit = memory/4;
-      private DataOutputBuffer buffer = new DataOutputBuffer();
+      private int memoryLimit = memory/4;
+      private int recordLimit = 1000000;
+      
+      private DataOutputBuffer rawKeys = new DataOutputBuffer();
       private byte[] rawBuffer;
 
-      private int[] starts = new int[1024];
-      private int[] pointers = new int[starts.length];
-      private int[] pointersCopy = new int[starts.length];
-      private int[] keyLengths = new int[starts.length];
-      private int[] lengths = new int[starts.length];
+      private int[] keyOffsets = new int[1024];
+      private int[] pointers = new int[keyOffsets.length];
+      private int[] pointersCopy = new int[keyOffsets.length];
+      private int[] keyLengths = new int[keyOffsets.length];
+      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
       
-      private Reader in;
-      private FSDataOutputStream out;
+      private ArrayList segmentLengths = new ArrayList();
+      
+      private Reader in = null;
+      private FSDataOutputStream out = null;
+      private FSDataOutputStream indexOut = null;
       private Path outName;
 
       public int run(boolean deleteInput) throws IOException {
         int segments = 0;
         int currentFile = 0;
-        boolean atEof = currentFile >= inFiles.length;
+        boolean atEof = (currentFile >= inFiles.length);
         boolean isCompressed = false;
-        if (!atEof) {
-          in = new Reader(fs, inFiles[currentFile], conf);
-          isCompressed = in.isCompressed();
+        boolean isBlockCompressed = false;
+        segmentLengths.clear();
+        if (atEof) {
+          return 0;
+        }
+        
+        // Initialize
+        in = new Reader(fs, inFiles[currentFile], conf);
+        isCompressed = in.isCompressed();
+        isBlockCompressed = in.isBlockCompressed();
+        for (int i=0; i < rawValues.length; ++i) {
+          rawValues[i] = null;
         }
+        
         while (!atEof) {
           int count = 0;
-          buffer.reset();
-          while (!atEof && buffer.getLength() < limit) {
-
-            int start = buffer.getLength();       // read an entry into buffer
-            int keyLength = in.next(buffer);
-            if (keyLength == -1) {
+          int bytesProcessed = 0;
+          rawKeys.reset();
+          while (!atEof && 
+              bytesProcessed < memoryLimit && count < recordLimit) {
+
+            // Read a record into buffer
+            // Note: Attempt to re-use 'rawValue' as far as possible
+            int keyOffset = rawKeys.getLength();       
+            ValueBytes rawValue = 
+              (count == keyOffsets.length || rawValues[count] == null) ? 
+                  in.createValueBytes() : 
+                  rawValues[count];
+            int recordLength = in.nextRaw(rawKeys, rawValue);
+            if (recordLength == -1) {
               in.close();
               if (deleteInput) {
                 fs.delete(inFiles[currentFile]);
@@ -607,24 +1460,27 @@
               }
               continue;
             }
-            int length = buffer.getLength() - start;
+            //int length = buffer.getLength() - start;
+            int keyLength = rawKeys.getLength() - keyOffset;
 
-            if (count == starts.length)
+            if (count == keyOffsets.length)
               grow();
 
-            starts[count] = start;                // update pointers
+            keyOffsets[count] = keyOffset;                // update pointers
             pointers[count] = count;
-            lengths[count] = length;
             keyLengths[count] = keyLength;
+            rawValues[count] = rawValue;
 
+            bytesProcessed += recordLength; 
             count++;
           }
 
           // buffer is full -- sort & flush it
-          LOG.info("flushing segment " + segments);
-          rawBuffer = buffer.getData();
+          LOG.debug("flushing segment " + segments);
+          rawBuffer = rawKeys.getData();
           sort(count);
-          flush(count, isCompressed, segments==0 && atEof);
+          flush(count, bytesProcessed, isCompressed, isBlockCompressed, 
+              segments==0 && atEof);
           segments++;
         }
         return segments;
@@ -637,15 +1493,18 @@
         if (out != null) {
           out.close();
         }
+        if (indexOut != null) {
+          indexOut.close();
+        }
       }
 
       private void grow() {
-        int newLength = starts.length * 3 / 2;
-        starts = grow(starts, newLength);
+        int newLength = keyOffsets.length * 3 / 2;
+        keyOffsets = grow(keyOffsets, newLength);
         pointers = grow(pointers, newLength);
         pointersCopy = new int[newLength];
         keyLengths = grow(keyLengths, newLength);
-        lengths = grow(lengths, newLength);
+        rawValues = grow(rawValues, newLength);
       }
 
       private int[] grow(int[] old, int newLength) {
@@ -653,31 +1512,51 @@
         System.arraycopy(old, 0, result, 0, old.length);
         return result;
       }
+      
+      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
+        ValueBytes[] result = new ValueBytes[newLength];
+        System.arraycopy(old, 0, result, 0, old.length);
+        for (int i=old.length; i < newLength; ++i) {
+          result[i] = null;
+        }
+        return result;
+      }
 
-      private void flush(int count, boolean isCompressed, 
-                         boolean done) throws IOException {
+      private void flush(int count, int bytesProcessed, boolean isCompressed, 
+          boolean isBlockCompressed, boolean done) throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile.suffix(".0");
           out = fs.create(outName);
+          if (!done) {
+            indexOut = fs.create(outName.suffix(".index"));
+          }
         }
 
-        if (!done) {                              // an intermediate file
-
-          long length = buffer.getLength();       // compute its size
-          length += count*8;                      // allow for length/keyLength
-
-          out.writeLong(length);                  // write size
-          out.writeLong(count);                   // write count
-        }
-
-        Writer writer = new Writer(out, keyClass, valClass, isCompressed);
+        long segmentStart = out.getPos();
+        Writer writer = createWriter(out, keyClass, valClass, 
+            isCompressed, isBlockCompressed);
+        
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
         }
 
         for (int i = 0; i < count; i++) {         // write in sorted order
           int p = pointers[i];
-          writer.append(rawBuffer, starts[p], lengths[p], keyLengths[p]);
+          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
+        }
+        if (writer instanceof SequenceFile.BlockCompressWriter) {
+          SequenceFile.BlockCompressWriter bcWriter = 
+            (SequenceFile.BlockCompressWriter) writer;
+          bcWriter.writeBlock();
+        }
+        writer.out.flush();
+        
+        
+        if (!done) {
+          // Save the segment length
+          WritableUtils.writeVLong(indexOut, segmentStart);
+          WritableUtils.writeVLong(indexOut, (writer.out.getPos()-segmentStart));
+          indexOut.flush();
         }
       }
 
@@ -687,8 +1566,8 @@
       }
 
       private int compare(int i, int j) {
-        return comparator.compare(rawBuffer, starts[i], keyLengths[i],
-                                  rawBuffer, starts[j], keyLengths[j]);
+        return comparator.compare(rawBuffer, keyOffsets[i], keyLengths[i],
+                                  rawBuffer, keyOffsets[j], keyLengths[j]);
       }
 
       private void mergeSort(int src[], int dest[], int low, int high) {
@@ -715,7 +1594,7 @@
         }
 
         // Merge sorted halves (now in src) into dest
-        for(int i = low, p = low, q = mid; i < high; i++) {
+        for (int i = low, p = low, q = mid; i < high; i++) {
           if (q>=high || p<mid && compare(src[p], src[q]) <= 0)
             dest[i] = src[p++];
           else
@@ -724,11 +1603,11 @@
       }
 
       private void swap(int x[], int a, int b) {
-	int t = x[a];
-	x[a] = x[b];
-	x[b] = t;
+        int t = x[a];
+        x[a] = x[b];
+        x[b] = t;
       }
-    }
+    } // SequenceFile.Sorter.SortPass
 
     private int mergePass(int pass, boolean last) throws IOException {
       LOG.debug("running merge pass=" + pass);
@@ -744,8 +1623,9 @@
       private boolean last;
 
       private MergeQueue queue;
-      private FSDataInputStream in;
+      private FSDataInputStream in = null;
       private Path inName;
+      private FSDataInputStream indexIn = null;
 
       public MergePass(int pass, boolean last) throws IOException {
         this.last = last;
@@ -755,6 +1635,7 @@
 
         this.inName = outFile.suffix("."+(pass-1));
         this.in = fs.open(inName);
+        this.indexIn = fs.open(inName.suffix(".index"));
       }
 
       public void close() throws IOException {
@@ -770,40 +1651,35 @@
 
         while (in.getPos() < end) {
           LOG.debug("merging segment " + segments);
-          long totalLength = 0;
-          long totalCount = 0;
+          long segmentStart = queue.out.getPos();
           while (in.getPos() < end && queue.size() < factor) {
-            long length = in.readLong();
-            long count = in.readLong();
-
-            totalLength += length;
-
-            totalCount+= count;
-
+            long segmentOffset = WritableUtils.readVLong(indexIn);
+            long segmentLength = WritableUtils.readVLong(indexIn);
             Reader reader = new Reader(fs, inName, memory/(factor+1),
-                                       in.getPos(), length, conf);
+                                        segmentOffset, segmentLength, conf);
             reader.sync = null;                   // disable sync on temp files
 
             MergeStream ms = new MergeStream(reader); // add segment to queue
             if (ms.next()) {
-              queue.add(ms);
+              queue.put(ms);
             }
             in.seek(reader.end);
           }
 
-          if (!last) {                             // intermediate file
-            queue.out.writeLong(totalLength);     // write size
-            queue.out.writeLong(totalCount);      // write count
-          }
-
           queue.merge();                          // do a merge
 
+          if (!last) {
+            WritableUtils.writeVLong(queue.indexOut, segmentStart);
+            WritableUtils.writeVLong(queue.indexOut, 
+                (queue.out.getPos() - segmentStart));
+          }
+          
           segments++;
         }
 
         return segments;
       }
-    }
+    } // SequenceFile.Sorter.MergePass
 
     /** Merge the provided files.*/
     public void merge(Path[] inFiles, Path outFile) throws IOException {
@@ -821,7 +1697,6 @@
       } finally {
         mergeFiles.close();                       // close it
       }
-
     }
 
     private class MergeFiles {
@@ -847,13 +1722,13 @@
 
         queue.merge();
       }
-    }
+    } // SequenceFile.Sorter.MergeFiles
 
     private class MergeStream {
       private Reader in;
 
-      private DataOutputBuffer buffer = new DataOutputBuffer();
-      private int keyLength;
+      private DataOutputBuffer rawKey = null;
+      private ValueBytes rawValue = null;
       
       public MergeStream(Reader reader) throws IOException {
         if (reader.keyClass != keyClass)
@@ -863,53 +1738,67 @@
           throw new IOException("wrong value class: "+reader.getValueClass()+
                                 " is not " + valClass);
         this.in = reader;
+        rawKey = new DataOutputBuffer();
+        rawValue = in.createValueBytes();
       }
 
       public boolean next() throws IOException {
-        buffer.reset();
-        keyLength = in.next(buffer);
-        return keyLength >= 0;
+        rawKey.reset();
+        int recordLength = 
+          in.nextRaw(rawKey, rawValue);
+        return (recordLength >= 0);
       }
-    }
+    } // SequenceFile.Sorter.MergeStream
 
     private class MergeQueue extends PriorityQueue {
+      private Path outName;
       private FSDataOutputStream out;
+      private FSDataOutputStream indexOut;
       private boolean done;
       private boolean compress;
+      private boolean blockCompress;
 
-      public void add(MergeStream stream) throws IOException {
+      public void put(MergeStream stream) throws IOException {
         if (size() == 0) {
           compress = stream.in.isCompressed();
-        } else if (compress != stream.in.isCompressed()) {
+          blockCompress = stream.in.isBlockCompressed();
+        } else if (compress != stream.in.isCompressed() || 
+            blockCompress != stream.in.isBlockCompressed()) {
           throw new IOException("All merged files must be compressed or not.");
-        }
-        put(stream);
+        } 
+        super.put(stream);
       }
 
       public MergeQueue(int size, Path outName, boolean done)
         throws IOException {
         initialize(size);
-        this.out = fs.create(outName, true, memory/(factor+1));
+        this.outName = outName;
+        this.out = fs.create(this.outName, true, memory/(factor+1));
+        if (!done) {
+          this.indexOut = fs.create(outName.suffix(".index"), true, 
+              memory/(factor+1));
+        }
         this.done = done;
       }
 
       protected boolean lessThan(Object a, Object b) {
         MergeStream msa = (MergeStream)a;
         MergeStream msb = (MergeStream)b;
-        return comparator.compare(msa.buffer.getData(), 0, msa.keyLength,
-                                  msb.buffer.getData(), 0, msb.keyLength) < 0;
+        return comparator.compare(msa.rawKey.getData(), 0, msa.rawKey.getLength(),
+            msb.rawKey.getData(), 0, msb.rawKey.getLength()) < 0;
       }
 
       public void merge() throws IOException {
-        Writer writer = new Writer(out, keyClass, valClass, compress);
+        Writer writer = createWriter(out, keyClass, valClass, 
+            compress, blockCompress);
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
         }
 
         while (size() != 0) {
           MergeStream ms = (MergeStream)top();
-          DataOutputBuffer buffer = ms.buffer;    // write top entry
-          writer.append(buffer.getData(), 0, buffer.getLength(), ms.keyLength);
+          writer.appendRaw(ms.rawKey.getData(), 0, ms.rawKey.getLength(), 
+              ms.rawValue);                       // write top entry
           
           if (ms.next()) {                        // has another entry
             adjustTop();
@@ -918,6 +1807,13 @@
             ms.in.close();
           }
         }
+
+        if (writer instanceof SequenceFile.BlockCompressWriter) {
+          SequenceFile.BlockCompressWriter bcWriter = 
+            (SequenceFile.BlockCompressWriter) writer;
+          bcWriter.writeBlock();
+        }
+        out.flush();
       }
 
       public void close() throws IOException {
@@ -926,8 +1822,13 @@
           ms.in.close();
         }
         out.close();                              // close output
+        if (indexOut != null) {
+          indexOut.close();
+        }
       }
-    }
-  }
+      
+    } // SequenceFile.Sorter.MergeQueue
+    
+  } // SequenceFile.Sorter
 
-}
+} // SequenceFile

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Mon Aug 28 12:24:54 2006
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
 
@@ -38,11 +39,17 @@
 
     Path file = new Path(job.getOutputPath(), name);
 
-    final SequenceFile.Writer out =
-      new SequenceFile.Writer(fs, file,
+    /** TODO: Figure out a way to deprecate 'mapred.output.compress' */
+    final SequenceFile.Writer out = 
+      SequenceFile.createWriter(fs, job, file,
                               job.getOutputKeyClass(),
                               job.getOutputValueClass(),
-                              job.getBoolean("mapred.output.compress", false),
+                              job.getBoolean("mapred.output.compress", false) ? 
+                                  CompressionType.RECORD : 
+                                  CompressionType.valueOf(
+                                    job.get("mapred.seqfile.compression.type", 
+                                        "NONE")
+                                  ),
                               progress);
 
     return new RecordWriter() {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Mon Aug 28 12:24:54 2006
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -122,8 +123,9 @@
       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
       SequenceFile.Writer writer = null;
       try {
-        writer = new SequenceFile.Writer(fs, controlFile,
-                                         UTF8.class, LongWritable.class);
+        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
+                                         UTF8.class, LongWritable.class,
+                                         CompressionType.NONE);
         writer.append(new UTF8(name), new LongWritable(fileSize));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Mon Aug 28 12:24:54 2006
@@ -28,6 +28,7 @@
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -89,7 +90,8 @@
 
     Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
     SequenceFile.Writer writer =
-      new SequenceFile.Writer(fs, inputFile, UTF8.class, LongWritable.class);
+      SequenceFile.createWriter(fs, fsConfig, inputFile, 
+          UTF8.class, LongWritable.class, CompressionType.NONE);
     
     try {
       nrFiles = 0;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Mon Aug 28 12:24:54 2006
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -116,8 +117,9 @@
       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
       SequenceFile.Writer writer = null;
       try {
-        writer = new SequenceFile.Writer(fs, controlFile,
-                                         UTF8.class, LongWritable.class);
+        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
+                                         UTF8.class, LongWritable.class,
+                                         CompressionType.NONE);
         writer.append(new UTF8(name), new LongWritable(fileSize));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Mon Aug 28 12:24:54 2006
@@ -25,6 +25,7 @@
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 public class TestFileSystem extends TestCase {
@@ -77,7 +78,8 @@
     Random random = new Random(seed);
 
     SequenceFile.Writer writer =
-      new SequenceFile.Writer(fs, controlFile, UTF8.class, LongWritable.class);
+      SequenceFile.createWriter(fs, conf, controlFile, 
+          UTF8.class, LongWritable.class, CompressionType.NONE);
 
     long totalSize = 0;
     long maxSize = ((megaBytes / numFiles) * 2) + 1;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Mon Aug 28 12:24:54 2006
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.*;
 
 
@@ -40,13 +41,19 @@
     int megabytes = 1;
     int factor = 5;
     Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq");
+    Path recordCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.rc.seq");
+    Path blockCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
  
     int seed = new Random().nextInt();
 
-    FileSystem fs = new LocalFileSystem(new Configuration());
+    FileSystem fs = new LocalFileSystem(conf);
     try {
         //LOG.setLevel(Level.FINE);
-        writeTest(fs, count, seed, file, false);
+
+        // SequenceFile.Writer
+        writeTest(fs, count, seed, file, CompressionType.NONE);
         readTest(fs, count, seed, file);
 
         sortTest(fs, count, megabytes, factor, false, file);
@@ -55,24 +62,63 @@
         sortTest(fs, count, megabytes, factor, true, file);
         checkSort(fs, count, seed, file);
 
-        mergeTest(fs, count, seed, file, false, factor, megabytes);
+        mergeTest(fs, count, seed, file, CompressionType.NONE, false, 
+            factor, megabytes);
         checkSort(fs, count, seed, file);
 
-        mergeTest(fs, count, seed, file, true, factor, megabytes);
+        mergeTest(fs, count, seed, file, CompressionType.NONE, true, 
+            factor, megabytes);
         checkSort(fs, count, seed, file);
-    } finally {
+        
+        // SequenceFile.RecordCompressWriter
+        writeTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD);
+        readTest(fs, count, seed, recordCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, false, recordCompressedFile);
+        checkSort(fs, count, seed, recordCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, true, recordCompressedFile);
+        checkSort(fs, count, seed, recordCompressedFile);
+
+        mergeTest(fs, count, seed, recordCompressedFile, 
+            CompressionType.RECORD, false, factor, megabytes);
+        checkSort(fs, count, seed, recordCompressedFile);
+
+        mergeTest(fs, count, seed, recordCompressedFile, 
+            CompressionType.RECORD, true, factor, megabytes);
+        checkSort(fs, count, seed, recordCompressedFile);
+        
+        // SequenceFile.BlockCompressWriter
+        writeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK);
+        readTest(fs, count, seed, blockCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, false, blockCompressedFile);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        sortTest(fs, count, megabytes, factor, true, blockCompressedFile);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
+            false, factor, megabytes);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
+            true, factor, megabytes);
+        checkSort(fs, count, seed, blockCompressedFile);
+
+        } finally {
         fs.close();
     }
   }
 
-  private static void writeTest(FileSystem fs, int count, int seed,
-                                Path file, boolean compress)
+  private static void writeTest(FileSystem fs, int count, int seed, Path file, 
+      CompressionType compressionType)
     throws IOException {
     fs.delete(file);
     LOG.debug("creating with " + count + " records");
-    SequenceFile.Writer writer =
-      new SequenceFile.Writer(fs, file, RandomDatum.class, RandomDatum.class,
-                              compress);
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fs, conf, file, 
+          RandomDatum.class, RandomDatum.class, compressionType);
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
     for (int i = 0; i < count; i++) {
       generator.next();
@@ -86,22 +132,39 @@
 
   private static void readTest(FileSystem fs, int count, int seed, Path file)
     throws IOException {
-    RandomDatum k = new RandomDatum();
-    RandomDatum v = new RandomDatum();
     LOG.debug("reading " + count + " records");
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+
+    RandomDatum k = new RandomDatum();
+    RandomDatum v = new RandomDatum();
+    DataOutputBuffer rawKey = new DataOutputBuffer();
+    SequenceFile.ValueBytes rawValue = reader.createValueBytes();
+    
     for (int i = 0; i < count; i++) {
       generator.next();
       RandomDatum key = generator.getKey();
       RandomDatum value = generator.getValue();
-      
-      reader.next(k, v);
-      
-      if (!k.equals(key))
-        throw new RuntimeException("wrong key at " + i);
-      if (!v.equals(value))
-        throw new RuntimeException("wrong value at " + i);
+
+      if ((i%5) == 10) {
+        // Testing 'raw' apis
+        rawKey.reset();
+        reader.nextRaw(rawKey, rawValue);
+      } else {
+        // Testing 'non-raw' apis 
+        if ((i%2) == 0) {
+          reader.next(k);
+          reader.getCurrentValue(v);
+        } else {
+          reader.next(k, v);
+        }
+
+        // Sanity check
+        if (!k.equals(key))
+          throw new RuntimeException("wrong key at " + i);
+        if (!v.equals(value))
+          throw new RuntimeException("wrong value at " + i);
+      }
     }
     reader.close();
   }
@@ -152,9 +215,9 @@
     LOG.debug("sucessfully checked " + count + " records");
   }
 
-  private static void mergeTest(FileSystem fs, int count, int seed, 
-                                Path file, boolean fast, int factor, 
-                                int megabytes)
+  private static void mergeTest(FileSystem fs, int count, int seed, Path file, 
+                                CompressionType compressionType,
+                                boolean fast, int factor, int megabytes)
     throws IOException {
 
     LOG.debug("creating "+factor+" files with "+count/factor+" records");
@@ -168,8 +231,8 @@
       sortedNames[i] = names[i].suffix(".sorted");
       fs.delete(names[i]);
       fs.delete(sortedNames[i]);
-      writers[i] =
-        new SequenceFile.Writer(fs, names[i], RandomDatum.class,RandomDatum.class);
+      writers[i] = SequenceFile.createWriter(fs, conf, names[i], 
+          RandomDatum.class, RandomDatum.class, compressionType);
     }
 
     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
@@ -215,21 +278,25 @@
     int megabytes = 1;
     int factor = 10;
     boolean create = true;
+    boolean rwonly = false;
     boolean check = false;
     boolean fast = false;
     boolean merge = false;
-    boolean compress = false;
+    String compressType = "NONE";
     Path file = null;
-    String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] [-compress] file";
-    
+
+    String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +
+        "[-count N] " + "[-check] [-compressType <NONE|RECORD|BLOCK>] " +
+        "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
+        " file";
     if (args.length == 0) {
         System.err.println(usage);
         System.exit(-1);
     }
-    int i = 0;
-    FileSystem fs = FileSystem.parseArgs(args, i, conf);      
+    
+    FileSystem fs = FileSystem.parseArgs(args, 0, conf);      
     try {
-      for (; i < args.length; i++) {       // parse command line
+      for (int i=0; i < args.length; ++i) {       // parse command line
           if (args[i] == null) {
               continue;
           } else if (args[i].equals("-count")) {
@@ -238,6 +305,8 @@
               megabytes = Integer.parseInt(args[++i]);
           } else if (args[i].equals("-factor")) {
               factor = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-rwonly")) {
+              rwonly = true;
           } else if (args[i].equals("-nocreate")) {
               create = false;
           } else if (args[i].equals("-check")) {
@@ -246,8 +315,8 @@
               fast = true;
           } else if (args[i].equals("-merge")) {
               merge = true;
-          } else if (args[i].equals("-compress")) {
-              compress = true;
+          } else if (args[i].equals("-compressType")) {
+              compressType = args[++i];
           } else {
               // file is required parameter
               file = new Path(args[i]);
@@ -257,23 +326,34 @@
         LOG.info("megabytes = " + megabytes);
         LOG.info("factor = " + factor);
         LOG.info("create = " + create);
+        LOG.info("rwonly = " + rwonly);
         LOG.info("check = " + check);
         LOG.info("fast = " + fast);
         LOG.info("merge = " + merge);
-        LOG.info("compress = " + compress);
+        LOG.info("compressType = " + compressType);
         LOG.info("file = " + file);
 
+        if (rwonly && (!create || merge || fast)) {
+          System.err.println(usage);
+          System.exit(-1);
+        }
+
         int seed = 0;
- 
-        if (create && !merge) {
-            writeTest(fs, count, seed, file, compress);
+        CompressionType compressionType = 
+          CompressionType.valueOf(compressType);
+
+        if (rwonly || (create && !merge)) {
+            writeTest(fs, count, seed, file, compressionType);
             readTest(fs, count, seed, file);
         }
 
-        if (merge) {
-            mergeTest(fs, count, seed, file, fast, factor, megabytes);
-        } else {
+        if (!rwonly) {
+          if (merge) {
+            mergeTest(fs, count, seed, file, compressionType, 
+                fast, factor, megabytes);
+          } else {
             sortTest(fs, count, megabytes, factor, fast, file);
+          }
         }
     
         if (check) {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=437791&r1=437790&r2=437791&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Mon Aug 28 12:24:54 2006
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /**
  * A Map-reduce program to estimaate the valu eof Pi using monte-carlo
@@ -119,8 +120,9 @@
         Path outDir = new Path(tmpDir, "out");
         Path outFile = new Path(outDir, "reduce-out");
         FileSystem fileSys = FileSystem.get(conf);
-        SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
-              IntWritable.class, IntWritable.class);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+            outFile, IntWritable.class, IntWritable.class, 
+            CompressionType.NONE);
         writer.append(new IntWritable(numInside), new IntWritable(numOutside));
         writer.close();
       }
@@ -169,8 +171,8 @@
     
     for(int idx=0; idx < numMaps; ++idx) {
       Path file = new Path(inDir, "part"+idx);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
-              IntWritable.class, IntWritable.class);
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, 
+          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
       writer.append(new IntWritable(numPoints), new IntWritable(0));
       writer.close();
     }



Mime
View raw message