hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1002937 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/security/
Date Thu, 30 Sep 2010 02:59:32 GMT
Author: omalley
Date: Thu Sep 30 02:59:32 2010
New Revision: 1002937

URL: http://svn.apache.org/viewvc?rev=1002937&view=rev
Log:
HADOOP-6856. Simplify constructors for SequenceFile, and MapFile. (omalley)

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsShell.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/ArrayFile.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/BloomMapFile.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/MapFile.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/SetFile.java
    hadoop/common/trunk/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Sep 30 02:59:32 2010
@@ -137,6 +137,8 @@ Trunk (unreleased changes)
     HADOOP-6965. Introduces checks for whether the original tgt is valid 
     in the reloginFromKeytab method.
 
+    HADOOP-6856. Simplify constructors for SequenceFile, and MapFile. (omalley)
+
   OPTIMIZATIONS
 
     HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsShell.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsShell.java Thu Sep 30 02:59:32 2010
@@ -372,7 +372,8 @@ public class FsShell extends Configured 
     public TextRecordInputStream(FileStatus f) throws IOException {
       final Path fpath = f.getPath();
       final Configuration lconf = getConf();
-      r = new SequenceFile.Reader(fpath.getFileSystem(lconf), fpath, lconf);
+      r = new SequenceFile.Reader(lconf, 
+                                  SequenceFile.Reader.file(fpath));
       key = ReflectionUtils.newInstance(
           r.getKeyClass().asSubclass(WritableComparable.class), lconf);
       val = ReflectionUtils.newInstance(

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/ArrayFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/ArrayFile.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/ArrayFile.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/ArrayFile.java Thu Sep 30 02:59:32 2010
@@ -42,7 +42,8 @@ public class ArrayFile extends MapFile {
     public Writer(Configuration conf, FileSystem fs,
                   String file, Class<? extends Writable> valClass)
       throws IOException {
-      super(conf, fs, file, LongWritable.class, valClass);
+      super(conf, new Path(file), keyClass(LongWritable.class), 
+            valueClass(valClass));
     }
 
     /** Create the named file for values of the named class. */
@@ -50,7 +51,11 @@ public class ArrayFile extends MapFile {
                   String file, Class<? extends Writable> valClass,
                   CompressionType compress, Progressable progress)
       throws IOException {
-      super(conf, fs, file, LongWritable.class, valClass, compress, progress);
+      super(conf, new Path(file), 
+            keyClass(LongWritable.class), 
+            valueClass(valClass), 
+            compressionType(compress), 
+            progressable(progress));
     }
 
     /** Append a value to the file. */
@@ -65,8 +70,9 @@ public class ArrayFile extends MapFile {
     private LongWritable key = new LongWritable();
 
     /** Construct an array reader for the named file.*/
-    public Reader(FileSystem fs, String file, Configuration conf) throws IOException {
-      super(fs, file, conf);
+    public Reader(FileSystem fs, String file, 
+                  Configuration conf) throws IOException {
+      super(new Path(file), conf);
     }
 
     /** Positions the reader before its <code>n</code>th value. */

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/BloomMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/BloomMapFile.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/BloomMapFile.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/BloomMapFile.java Thu Sep 30 02:59:32 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Options;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.bloom.DynamicBloomFilter;
 import org.apache.hadoop.util.bloom.Filter;
@@ -82,78 +83,80 @@ public class BloomMapFile {
     private FileSystem fs;
     private Path dir;
     
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         Class<? extends WritableComparable> keyClass,
         Class<? extends Writable> valClass, CompressionType compress,
         CompressionCodec codec, Progressable progress) throws IOException {
-      super(conf, fs, dirName, keyClass, valClass, compress, codec, progress);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
+           compressionType(compress), compressionCodec(codec), 
+           progressable(progress));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         Class<? extends WritableComparable> keyClass,
         Class valClass, CompressionType compress,
         Progressable progress) throws IOException {
-      super(conf, fs, dirName, keyClass, valClass, compress, progress);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
+           compressionType(compress), progressable(progress));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         Class<? extends WritableComparable> keyClass,
         Class valClass, CompressionType compress)
         throws IOException {
-      super(conf, fs, dirName, keyClass, valClass, compress);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
+           compressionType(compress));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass,
         CompressionType compress, CompressionCodec codec, Progressable progress)
         throws IOException {
-      super(conf, fs, dirName, comparator, valClass, compress, codec, progress);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), comparator(comparator), 
+           valueClass(valClass), compressionType(compress), 
+           compressionCodec(codec), progressable(progress));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass,
         CompressionType compress, Progressable progress) throws IOException {
-      super(conf, fs, dirName, comparator, valClass, compress, progress);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), comparator(comparator), 
+           valueClass(valClass), compressionType(compress), 
+           progressable(progress));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass, CompressionType compress)
         throws IOException {
-      super(conf, fs, dirName, comparator, valClass, compress);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), comparator(comparator), 
+           valueClass(valClass), compressionType(compress));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
         WritableComparator comparator, Class valClass) throws IOException {
-      super(conf, fs, dirName, comparator, valClass);
-      this.fs = fs;
-      this.dir = new Path(dirName);
-      initBloomFilter(conf);
+      this(conf, new Path(dirName), comparator(comparator), 
+           valueClass(valClass));
     }
 
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
-        Class<? extends WritableComparable> keyClass,
-        Class valClass) throws IOException {
-      super(conf, fs, dirName, keyClass, valClass);
-      this.fs = fs;
-      this.dir = new Path(dirName);
+                  Class<? extends WritableComparable> keyClass,
+                  Class valClass) throws IOException {
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
+    }
+
+    public Writer(Configuration conf, Path dir, 
+                  SequenceFile.Writer.Option... options) throws IOException {
+      super(conf, dir, options);
+      this.fs = dir.getFileSystem(conf);
+      this.dir = dir;
       initBloomFilter(conf);
     }
 
@@ -197,27 +200,34 @@ public class BloomMapFile {
     private DataOutputBuffer buf = new DataOutputBuffer();
     private Key bloomKey = new Key();
 
+    public Reader(Path dir, Configuration conf,
+                  SequenceFile.Reader.Option... options) throws IOException {
+      super(dir, conf, options);
+      initBloomFilter(dir, conf);
+    }
+
+    @Deprecated
     public Reader(FileSystem fs, String dirName, Configuration conf)
         throws IOException {
-      super(fs, dirName, conf);
-      initBloomFilter(fs, dirName, conf);
+      this(new Path(dirName), conf);
     }
 
+    @Deprecated
     public Reader(FileSystem fs, String dirName, WritableComparator comparator,
         Configuration conf, boolean open) throws IOException {
-      super(fs, dirName, comparator, conf, open);
-      initBloomFilter(fs, dirName, conf);
+      this(new Path(dirName), conf, comparator(comparator));
     }
 
+    @Deprecated
     public Reader(FileSystem fs, String dirName, WritableComparator comparator,
         Configuration conf) throws IOException {
-      super(fs, dirName, comparator, conf);
-      initBloomFilter(fs, dirName, conf);
+      this(new Path(dirName), conf, comparator(comparator));
     }
     
-    private void initBloomFilter(FileSystem fs, String dirName,
-        Configuration conf) {
+    private void initBloomFilter(Path dirName, 
+                                 Configuration conf) {
       try {
+        FileSystem fs = dirName.getFileSystem(conf);
         DataInputStream in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
         bloomFilter = new DynamicBloomFilter();
         bloomFilter.readFields(in);

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/MapFile.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/MapFile.java Thu Sep 30 02:59:32 2010
@@ -24,6 +24,7 @@ import java.io.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Options;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -31,6 +32,8 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 
@@ -91,94 +94,194 @@ public class MapFile {
     private long lastIndexKeyCount = Long.MIN_VALUE;
 
 
-    /** Create the named map for keys of the named class. */
+    /** Create the named map for keys of the named class. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  Class<? extends WritableComparable> keyClass, Class valClass)
-      throws IOException {
-      this(conf, fs, dirName,
-           WritableComparator.get(keyClass), valClass,
-           SequenceFile.getCompressionType(conf));
+                  Class<? extends WritableComparable> keyClass, 
+                  Class valClass) throws IOException {
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
     }
 
-    /** Create the named map for keys of the named class. */
+    /** Create the named map for keys of the named class. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, Class valClass,
-                  CompressionType compress, Progressable progress)
-      throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
-           compress, progress);
+                  CompressionType compress, 
+                  Progressable progress) throws IOException {
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
+           compressionType(compress), progressable(progress));
     }
 
-    /** Create the named map for keys of the named class. */
+    /** Create the named map for keys of the named class. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, Class valClass,
                   CompressionType compress, CompressionCodec codec,
-                  Progressable progress)
-      throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
-           compress, codec, progress);
+                  Progressable progress) throws IOException {
+      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
+           compressionType(compress), compressionCodec(codec), 
+           progressable(progress));
     }
 
-    /** Create the named map for keys of the named class. */
+    /** Create the named map for keys of the named class. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class<? extends WritableComparable> keyClass, Class valClass,
-                  CompressionType compress)
-      throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
+                  CompressionType compress) throws IOException {
+      this(conf, new Path(dirName), keyClass(keyClass),
+           valueClass(valClass), compressionType(compress));
     }
 
-    /** Create the named map using the named key comparator. */
+    /** Create the named map using the named key comparator. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
-                  WritableComparator comparator, Class valClass)
-      throws IOException {
-      this(conf, fs, dirName, comparator, valClass,
-           SequenceFile.getCompressionType(conf));
+                  WritableComparator comparator, Class valClass
+                  ) throws IOException {
+      this(conf, new Path(dirName), comparator(comparator), 
+           valueClass(valClass));
     }
-    /** Create the named map using the named key comparator. */
+
+    /** Create the named map using the named key comparator. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
-                  SequenceFile.CompressionType compress)
-      throws IOException {
-      this(conf, fs, dirName, comparator, valClass, compress, null);
+                  SequenceFile.CompressionType compress) throws IOException {
+      this(conf, new Path(dirName), comparator(comparator),
+           valueClass(valClass), compressionType(compress));
     }
-    /** Create the named map using the named key comparator. */
+
+    /** Create the named map using the named key comparator. 
+     * @deprecated Use Writer(Configuration, Path, Option...)} instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
                   SequenceFile.CompressionType compress,
-                  Progressable progress)
-      throws IOException {
-      this(conf, fs, dirName, comparator, valClass, 
-           compress, new DefaultCodec(), progress);
+                  Progressable progress) throws IOException {
+      this(conf, new Path(dirName), comparator(comparator),
+           valueClass(valClass), compressionType(compress),
+           progressable(progress));
     }
-    /** Create the named map using the named key comparator. */
+
+    /** Create the named map using the named key comparator. 
+     * @deprecated Use Writer(Configuration, Path, Option...) instead.
+     */
+    @Deprecated
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator, Class valClass,
                   SequenceFile.CompressionType compress, CompressionCodec codec,
-                  Progressable progress)
-      throws IOException {
+                  Progressable progress) throws IOException {
+      this(conf, new Path(dirName), comparator(comparator),
+           valueClass(valClass), compressionType(compress),
+           compressionCodec(codec), progressable(progress));
+    }
+    
+    // our options are a superset of sequence file writer options
+    public static interface Option extends SequenceFile.Writer.Option { }
+    
+    private static class KeyClassOption extends Options.ClassOption
+                                        implements Option {
+      KeyClassOption(Class<?> value) {
+        super(value);
+      }
+    }
+    
+    private static class ComparatorOption implements Option {
+      private final WritableComparator value;
+      ComparatorOption(WritableComparator value) {
+        this.value = value;
+      }
+      WritableComparator getValue() {
+        return value;
+      }
+    }
+
+    public static Option keyClass(Class<? extends WritableComparable> value) {
+      return new KeyClassOption(value);
+    }
+    
+    public static Option comparator(WritableComparator value) {
+      return new ComparatorOption(value);
+    }
 
+    public static SequenceFile.Writer.Option valueClass(Class<?> value) {
+      return SequenceFile.Writer.valueClass(value);
+    }
+    
+    public static 
+    SequenceFile.Writer.Option compressionType(CompressionType value) {
+      return SequenceFile.Writer.compressionType(value);
+    }
+
+    public static 
+    SequenceFile.Writer.Option compressionCodec(CompressionCodec value) {
+      return SequenceFile.Writer.compressionCodec(value);
+    }
+
+    public static SequenceFile.Writer.Option progressable(Progressable value) {
+      return SequenceFile.Writer.progressable(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Writer(Configuration conf, 
+                  Path dirName,
+                  SequenceFile.Writer.Option... opts
+                  ) throws IOException {
+      KeyClassOption keyClassOption = 
+        Options.getOption(KeyClassOption.class, opts);
+      ComparatorOption comparatorOption =
+        Options.getOption(ComparatorOption.class, opts);
+      if ((keyClassOption == null) == (comparatorOption == null)) {
+        throw new IllegalArgumentException("key class or comparator option "
+                                           + "must be set");
+      }
       this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
 
-      this.comparator = comparator;
+      Class<? extends WritableComparable> keyClass;
+      if (keyClassOption == null) {
+        this.comparator = comparatorOption.getValue();
+        keyClass = comparator.getKeyClass();
+      } else {
+        keyClass= 
+          (Class<? extends WritableComparable>) keyClassOption.getValue();
+        this.comparator = WritableComparator.get(keyClass);
+      }
       this.lastKey = comparator.newKey();
+      FileSystem fs = dirName.getFileSystem(conf);
 
-      Path dir = new Path(dirName);
-      if (!fs.mkdirs(dir)) {
-        throw new IOException("Mkdirs failed to create directory " + dir.toString());
+      if (!fs.mkdirs(dirName)) {
+        throw new IOException("Mkdirs failed to create directory " + dirName);
       }
-      Path dataFile = new Path(dir, DATA_FILE_NAME);
-      Path indexFile = new Path(dir, INDEX_FILE_NAME);
+      Path dataFile = new Path(dirName, DATA_FILE_NAME);
+      Path indexFile = new Path(dirName, INDEX_FILE_NAME);
 
-      Class keyClass = comparator.getKeyClass();
-      this.data =
-        SequenceFile.createWriter
-        (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
-      this.index =
-        SequenceFile.createWriter
-        (fs, conf, indexFile, keyClass, LongWritable.class,
-         CompressionType.BLOCK, progress);
+      SequenceFile.Writer.Option[] dataOptions =
+        Options.prependOptions(opts, 
+                               SequenceFile.Writer.file(dataFile),
+                               SequenceFile.Writer.keyClass(keyClass));
+      this.data = SequenceFile.createWriter(conf, dataOptions);
+
+      SequenceFile.Writer.Option[] indexOptions =
+        Options.prependOptions(opts, 
+                               SequenceFile.Writer.file(indexFile),
+                               SequenceFile.Writer.keyClass(keyClass),
+                               SequenceFile.Writer.valueClass(LongWritable.class),
+                               SequenceFile.Writer.compressionType(CompressionType.BLOCK));
+      this.index = SequenceFile.createWriter(conf, indexOptions);      
     }
-    
+
     /** The number of entries that are added before an index entry is added.*/
     public int getIndexInterval() { return indexInterval; }
 
@@ -269,58 +372,86 @@ public class MapFile {
     /** Returns the class of values in this file. */
     public Class<?> getValueClass() { return data.getValueClass(); }
 
-    /** Construct a map reader for the named map.*/
-    public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
-      this(fs, dirName, null, conf);
-      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
+    public static interface Option extends SequenceFile.Reader.Option {}
+    
+    public static Option comparator(WritableComparator value) {
+      return new ComparatorOption(value);
     }
 
-    /** Construct a map reader for the named map using the named comparator.*/
-    public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
-      throws IOException {
-      this(fs, dirName, comparator, conf, true);
+    static class ComparatorOption implements Option {
+      private final WritableComparator value;
+      ComparatorOption(WritableComparator value) {
+        this.value = value;
+      }
+      WritableComparator getValue() {
+        return value;
+      }
     }
-    
-    /**
-     * Hook to allow subclasses to defer opening streams until further
-     * initialization is complete.
-     * @see #createDataFileReader(FileSystem, Path, Configuration)
+
+    public Reader(Path dir, Configuration conf,
+                  SequenceFile.Reader.Option... opts) throws IOException {
+      ComparatorOption comparatorOption = 
+        Options.getOption(ComparatorOption.class, opts);
+      WritableComparator comparator =
+        comparatorOption == null ? null : comparatorOption.getValue();
+      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
+      open(dir, comparator, conf, opts);
+    }
+ 
+    /** Construct a map reader for the named map.
+     * @deprecated
      */
-    protected Reader(FileSystem fs, String dirName,
-        WritableComparator comparator, Configuration conf, boolean open)
-      throws IOException {
-      
-      if (open) {
-        open(fs, dirName, comparator, conf);
-      }
+    @Deprecated
+    public Reader(FileSystem fs, String dirName, 
+                  Configuration conf) throws IOException {
+      this(new Path(dirName), conf);
+    }
+
+    /** Construct a map reader for the named map using the named comparator.
+     * @deprecated
+     */
+    @Deprecated
+    public Reader(FileSystem fs, String dirName, WritableComparator comparator, 
+                  Configuration conf) throws IOException {
+      this(new Path(dirName), conf, comparator(comparator));
     }
     
-    protected synchronized void open(FileSystem fs, String dirName,
-        WritableComparator comparator, Configuration conf) throws IOException {
-      Path dir = new Path(dirName);
+    protected synchronized void open(Path dir,
+                                     WritableComparator comparator,
+                                     Configuration conf, 
+                                     SequenceFile.Reader.Option... options
+                                     ) throws IOException {
       Path dataFile = new Path(dir, DATA_FILE_NAME);
       Path indexFile = new Path(dir, INDEX_FILE_NAME);
 
       // open the data
-      this.data = createDataFileReader(fs, dataFile, conf);
+      this.data = createDataFileReader(dataFile, conf, options);
       this.firstPosition = data.getPosition();
 
       if (comparator == null)
-        this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
+        this.comparator = 
+          WritableComparator.get(data.getKeyClass().
+                                   asSubclass(WritableComparable.class));
       else
         this.comparator = comparator;
 
       // open the index
-      this.index = new SequenceFile.Reader(fs, indexFile, conf);
+      SequenceFile.Reader.Option[] indexOptions =
+        Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
+      this.index = new SequenceFile.Reader(conf, indexOptions);
     }
 
     /**
      * Override this method to specialize the type of
      * {@link SequenceFile.Reader} returned.
      */
-    protected SequenceFile.Reader createDataFileReader(FileSystem fs,
-        Path dataFile, Configuration conf) throws IOException {
-      return new SequenceFile.Reader(fs, dataFile,  conf);
+    protected SequenceFile.Reader 
+      createDataFileReader(Path dataFile, Configuration conf,
+                           SequenceFile.Reader.Option... options
+                           ) throws IOException {
+      SequenceFile.Reader.Option[] newOptions =
+        Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
+      return new SequenceFile.Reader(conf, newOptions);
     }
 
     private void readIndex() throws IOException {
@@ -650,7 +781,8 @@ public class MapFile {
       // no fixing needed
       return -1;
     }
-    SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
+    SequenceFile.Reader dataReader = 
+      new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
     if (!dataReader.getKeyClass().equals(keyClass)) {
       throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
                           ", got " + dataReader.getKeyClass().getName());
@@ -663,7 +795,14 @@ public class MapFile {
     Writable key = ReflectionUtils.newInstance(keyClass, conf);
     Writable value = ReflectionUtils.newInstance(valueClass, conf);
     SequenceFile.Writer indexWriter = null;
-    if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
+    if (!dryrun) {
+      indexWriter = 
+        SequenceFile.createWriter(conf, 
+                                  SequenceFile.Writer.file(index), 
+                                  SequenceFile.Writer.keyClass(keyClass), 
+                                  SequenceFile.Writer.valueClass
+                                    (LongWritable.class));
+    }
     try {
       long pos = 0L;
       LongWritable position = new LongWritable();

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Sep 30 02:59:32 2010
@@ -23,6 +23,7 @@ import java.util.*;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import org.apache.commons.logging.*;
+import org.apache.hadoop.util.Options;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -226,24 +227,50 @@ public class SequenceFile {
    * @param job the job config to look in
    * @return the kind of compression to use
    */
-  @Deprecated
-  static public CompressionType getCompressionType(Configuration job) {
+  static public CompressionType getDefaultCompressionType(Configuration job) {
     String name = job.get("io.seqfile.compression.type");
     return name == null ? CompressionType.RECORD : 
       CompressionType.valueOf(name);
   }
   
   /**
-   * Set the compression type for sequence files.
+   * Set the default compression type for sequence files.
    * @param job the configuration to modify
    * @param val the new compression type (none, block, record)
    */
-  @Deprecated
-  static public void setCompressionType(Configuration job, 
-                                        CompressionType val) {
+  static public void setDefaultCompressionType(Configuration job, 
+                                               CompressionType val) {
     job.set("io.seqfile.compression.type", val.toString());
   }
-    
+
+  /**
+   * Create a new Writer with the given options.
+   * @param conf the configuration to use
+   * @param opts the options to create the file with
+   * @return a new Writer
+   * @throws IOException
+   */
+  public static Writer createWriter(Configuration conf, Writer.Option... opts
+                                    ) throws IOException {
+    Writer.CompressionTypeOption compressionOption = 
+      Options.getOption(Writer.CompressionTypeOption.class, opts);
+    CompressionType kind;
+    if (compressionOption != null) {
+      kind = compressionOption.getValue();
+    } else {
+      kind = getDefaultCompressionType(conf);
+    }
+    switch (kind) {
+    default:
+    case NONE:
+      return new Writer(conf, kind, opts);
+    case RECORD:
+      return new RecordCompressWriter(conf, kind, opts);
+    case BLOCK:
+      return new BlockCompressWriter(conf, kind, opts);
+    }
+  }
+
   /**
    * Construct the preferred type of SequenceFile Writer.
    * @param fs The configured filesystem. 
@@ -253,13 +280,15 @@ public class SequenceFile {
    * @param valClass The 'value' type.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer 
     createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass) 
-    throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass,
-                        getCompressionType(conf));
+                 Class keyClass, Class valClass) throws IOException {
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass));
   }
   
   /**
@@ -272,15 +301,17 @@ public class SequenceFile {
    * @param compressionType The compression type.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer 
     createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, CompressionType compressionType) 
-    throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass,
-            fs.getConf().getInt("io.file.buffer.size", 4096),
-            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-            compressionType, new DefaultCodec(), null, new Metadata());
+                 Class keyClass, Class valClass, 
+                 CompressionType compressionType) throws IOException {
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass), 
+                        Writer.compressionType(compressionType));
   }
   
   /**
@@ -294,15 +325,18 @@ public class SequenceFile {
    * @param progress The Progressable object to track progress.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, CompressionType compressionType,
                  Progressable progress) throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass,
-            fs.getConf().getInt("io.file.buffer.size", 4096),
-            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-            compressionType, new DefaultCodec(), progress, new Metadata());
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass), 
+                        Writer.compressionType(compressionType),
+                        Writer.progressable(progress));
   }
 
   /**
@@ -316,16 +350,18 @@ public class SequenceFile {
    * @param codec The compression codec.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer 
     createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, 
-                 CompressionType compressionType, CompressionCodec codec) 
-    throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass,
-            fs.getConf().getInt("io.file.buffer.size", 4096),
-            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-            compressionType, codec, null, new Metadata());
+                 Class keyClass, Class valClass, CompressionType compressionType, 
+                 CompressionCodec codec) throws IOException {
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass), 
+                        Writer.compressionType(compressionType),
+                        Writer.compressionCodec(codec));
   }
   
   /**
@@ -341,16 +377,21 @@ public class SequenceFile {
    * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, 
                  CompressionType compressionType, CompressionCodec codec,
                  Progressable progress, Metadata metadata) throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass,
-            fs.getConf().getInt("io.file.buffer.size", 4096),
-            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-            compressionType, codec, progress, metadata);
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass),
+                        Writer.compressionType(compressionType),
+                        Writer.compressionCodec(codec),
+                        Writer.progressable(progress),
+                        Writer.metadata(metadata));
   }
 
   /**
@@ -369,37 +410,25 @@ public class SequenceFile {
    * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name,
                  Class keyClass, Class valClass, int bufferSize,
                  short replication, long blockSize,
                  CompressionType compressionType, CompressionCodec codec,
                  Progressable progress, Metadata metadata) throws IOException {
-    if ((codec instanceof GzipCodec) &&
-        !NativeCodeLoader.isNativeCodeLoaded() &&
-        !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("SequenceFile doesn't work with " +
-                                         "GzipCodec without native-hadoop code!");
-    }
-
-    Writer writer = null;
-
-    if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass,
-                          bufferSize, replication, blockSize,
-                          progress, metadata);
-    } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
-                                        bufferSize, replication, blockSize,
-                                        codec, progress, metadata);
-    } else if (compressionType == CompressionType.BLOCK){
-      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
-                                       bufferSize, replication, blockSize,
-                                       codec, progress, metadata);
-    }
-
-    return writer;
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass), 
+                        Writer.bufferSize(bufferSize), 
+                        Writer.replication(replication),
+                        Writer.blockSize(blockSize),
+                        Writer.compressionType(compressionType),
+                        Writer.compressionCodec(codec),
+                        Writer.progressable(progress),
+                        Writer.metadata(metadata));
   }
 
   /**
@@ -414,95 +443,21 @@ public class SequenceFile {
    * @param progress The Progressable object to track progress.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, 
                  CompressionType compressionType, CompressionCodec codec,
                  Progressable progress) throws IOException {
-    Writer writer = createWriter(fs, conf, name, keyClass, valClass, 
-                                 compressionType, codec, progress, new Metadata());
-    return writer;
-  }
-
-  /**
-   * Construct the preferred type of 'raw' SequenceFile Writer.
-   * @param out The stream on top which the writer is to be constructed.
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compress Compress data?
-   * @param blockCompress Compress blocks?
-   * @param metadata The metadata of the file.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   */
-  private static Writer
-    createWriter(Configuration conf, FSDataOutputStream out, 
-                 Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-                 CompressionCodec codec, Metadata metadata)
-    throws IOException {
-    if (codec != null && (codec instanceof GzipCodec) && 
-        !NativeCodeLoader.isNativeCodeLoaded() && 
-        !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("SequenceFile doesn't work with " +
-                                         "GzipCodec without native-hadoop code!");
-    }
-
-    Writer writer = null;
-
-    if (!compress) {
-      writer = new Writer(conf, out, keyClass, valClass, metadata);
-    } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    } else {
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    }
-    
-    return writer;
-  }
-
-  /**
-   * Construct the preferred type of 'raw' SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param file The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compress Compress data?
-   * @param blockCompress Compress blocks?
-   * @param codec The compression codec.
-   * @param progress
-   * @param metadata The metadata of the file.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   */
-  private static Writer
-  createWriter(FileSystem fs, Configuration conf, Path file, 
-               Class keyClass, Class valClass, 
-               boolean compress, boolean blockCompress,
-               CompressionCodec codec, Progressable progress, Metadata metadata)
-  throws IOException {
-  if (codec != null && (codec instanceof GzipCodec) && 
-      !NativeCodeLoader.isNativeCodeLoaded() && 
-      !ZlibFactory.isNativeZlibLoaded(conf)) {
-    throw new IllegalArgumentException("SequenceFile doesn't work with " +
-                                       "GzipCodec without native-hadoop code!");
-  }
-
-  Writer writer = null;
-
-  if (!compress) {
-    writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
-  } else if (compress && !blockCompress) {
-    writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, 
-                                      codec, progress, metadata);
-  } else {
-    writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, 
-                                     codec, progress, metadata);
+    return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass),
+                        Writer.compressionType(compressionType),
+                        Writer.compressionCodec(codec),
+                        Writer.progressable(progress));
   }
-  
-  return writer;
-}
 
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
@@ -515,30 +470,20 @@ public class SequenceFile {
    * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer
     createWriter(Configuration conf, FSDataOutputStream out, 
-                 Class keyClass, Class valClass, CompressionType compressionType,
-                 CompressionCodec codec, Metadata metadata)
-    throws IOException {
-    if ((codec instanceof GzipCodec) && 
-        !NativeCodeLoader.isNativeCodeLoaded() && 
-        !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("SequenceFile doesn't work with " +
-                                         "GzipCodec without native-hadoop code!");
-    }
-
-    Writer writer = null;
-
-    if (compressionType == CompressionType.NONE) {
-      writer = new Writer(conf, out, keyClass, valClass, metadata);
-    } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    } else if (compressionType == CompressionType.BLOCK){
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    }
-    
-    return writer;
+                 Class keyClass, Class valClass,
+                 CompressionType compressionType,
+                 CompressionCodec codec, Metadata metadata) throws IOException {
+    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass), 
+                        Writer.compressionType(compressionType),
+                        Writer.compressionCodec(codec),
+                        Writer.metadata(metadata));
   }
   
   /**
@@ -551,15 +496,18 @@ public class SequenceFile {
    * @param codec The compression codec.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
+   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
+   *     instead.
    */
+  @Deprecated
   public static Writer
     createWriter(Configuration conf, FSDataOutputStream out, 
                  Class keyClass, Class valClass, CompressionType compressionType,
-                 CompressionCodec codec)
-    throws IOException {
-    Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
-                                 codec, new Metadata());
-    return writer;
+                 CompressionCodec codec) throws IOException {
+    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
+                        Writer.valueClass(valClass),
+                        Writer.compressionType(compressionType),
+                        Writer.compressionCodec(codec));
   }
   
 
@@ -790,7 +738,7 @@ public class SequenceFile {
   
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer implements java.io.Closeable {
-    Configuration conf;
+    private Configuration conf;
     FSDataOutputStream out;
     boolean ownOutputStream = true;
     DataOutputBuffer buffer = new DataOutputBuffer();
@@ -798,7 +746,7 @@ public class SequenceFile {
     Class keyClass;
     Class valClass;
 
-    private boolean compress;
+    private final CompressionType compress;
     CompressionCodec codec = null;
     CompressionOutputStream deflateFilter = null;
     DataOutputStream deflateOut = null;
@@ -825,73 +773,269 @@ public class SequenceFile {
       }
     }
 
-    /** Implicit constructor: needed for the period of transition!*/
-    Writer()
-    {}
+    public static interface Option {}
     
-    /** Create the named file. */
+    static class FileOption extends Options.PathOption 
+                                    implements Option {
+      FileOption(Path path) {
+        super(path);
+      }
+    }
+
+    static class StreamOption extends Options.FSDataOutputStreamOption 
+                              implements Option {
+      StreamOption(FSDataOutputStream stream) {
+        super(stream);
+      }
+    }
+
+    static class BufferSizeOption extends Options.IntegerOption
+                                  implements Option {
+      BufferSizeOption(int value) {
+        super(value);
+      }
+    }
+    
+    static class BlockSizeOption extends Options.LongOption implements Option {
+      BlockSizeOption(long value) {
+        super(value);
+      }
+    }
+
+    static class ReplicationOption extends Options.IntegerOption
+                                   implements Option {
+      ReplicationOption(int value) {
+        super(value);
+      }
+    }
+
+    static class KeyClassOption extends Options.ClassOption implements Option {
+      KeyClassOption(Class<?> value) {
+        super(value);
+      }
+    }
+
+    static class ValueClassOption extends Options.ClassOption
+                                          implements Option {
+      ValueClassOption(Class<?> value) {
+        super(value);
+      }
+    }
+
+    static class MetadataOption implements Option {
+      private final Metadata value;
+      MetadataOption(Metadata value) {
+        this.value = value;
+      }
+      Metadata getValue() {
+        return value;
+      }
+    }
+
+    static class ProgressableOption extends Options.ProgressableOption
+                                    implements Option {
+      ProgressableOption(Progressable value) {
+        super(value);
+      }
+    }
+
+    private static class CompressionTypeOption implements Option {
+      private final CompressionType value;
+      CompressionTypeOption(CompressionType value) {
+        this.value = value;
+      }
+      CompressionType getValue() {
+        return value;
+      }
+    }
+    
+    private static class CompressionCodecOption implements Option {
+      private final CompressionCodec value;
+      CompressionCodecOption(CompressionCodec value) {
+        this.value = value;
+      }
+      CompressionCodec getValue() {
+        return value;
+      }
+    }
+    
+    public static Option file(Path value) {
+      return new FileOption(value);
+    }
+    
+    public static Option bufferSize(int value) {
+      return new BufferSizeOption(value);
+    }
+    
+    public static Option stream(FSDataOutputStream value) {
+      return new StreamOption(value);
+    }
+    
+    public static Option replication(short value) {
+      return new ReplicationOption(value);
+    }
+    
+    public static Option blockSize(long value) {
+      return new BlockSizeOption(value);
+    }
+    
+    public static Option progressable(Progressable value) {
+      return new ProgressableOption(value);
+    }
+
+    public static Option keyClass(Class<?> value) {
+      return new KeyClassOption(value);
+    }
+    
+    public static Option valueClass(Class<?> value) {
+      return new ValueClassOption(value);
+    }
+    
+    public static Option metadata(Metadata value) {
+      return new MetadataOption(value);
+    }
+
+    public static Option compressionType(CompressionType value) {
+      return new CompressionTypeOption(value);
+    }
+    
+    public static Option compressionCodec(CompressionCodec value) {
+      return new CompressionCodecOption(value);
+    }
+
+    /**
+     * Construct a uncompressed writer from a set of options.
+     * @param conf the configuration to use
+     * @param compressionType the compression type being used
+     * @param options the options used when creating the writer
+     * @throws IOException if it fails
+     */
+    Writer(Configuration conf, 
+           CompressionType compressionType,
+           Option... opts) throws IOException {
+      this.compress = compressionType;
+      BlockSizeOption blockSizeOption = 
+        Options.getOption(BlockSizeOption.class, opts);
+      BufferSizeOption bufferSizeOption = 
+        Options.getOption(BufferSizeOption.class, opts);
+      ReplicationOption replicationOption = 
+        Options.getOption(ReplicationOption.class, opts);
+      ProgressableOption progressOption = 
+        Options.getOption(ProgressableOption.class, opts);
+      FileOption fileOption = Options.getOption(FileOption.class, opts);
+      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
+      KeyClassOption keyClassOption = 
+        Options.getOption(KeyClassOption.class, opts);
+      ValueClassOption valueClassOption = 
+        Options.getOption(ValueClassOption.class, opts);
+      CompressionCodecOption compressionCodecOption =
+        Options.getOption(CompressionCodecOption.class, opts);
+      MetadataOption metadataOption = 
+        Options.getOption(MetadataOption.class, opts);
+      // check consistency of options
+      if ((fileOption == null) == (streamOption == null)) {
+        throw new IllegalArgumentException("file or stream must be specified");
+      }
+      if (fileOption == null && (blockSizeOption != null ||
+                                 bufferSizeOption != null ||
+                                 replicationOption != null ||
+                                 progressOption != null)) {
+        throw new IllegalArgumentException("file modifier options not " +
+                                           "compatible with stream");
+      }
+
+      FSDataOutputStream out;
+      boolean ownStream = fileOption != null;
+      if (ownStream) {
+        Path p = fileOption.getValue();
+        FileSystem fs = p.getFileSystem(conf);
+        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
+          bufferSizeOption.getValue();
+        short replication = replicationOption == null ? 
+          fs.getDefaultReplication() :
+          (short) replicationOption.getValue();
+        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize() :
+          blockSizeOption.getValue();
+        Progressable progress = progressOption == null ? null :
+          progressOption.getValue();
+        out = fs.create(p, false, bufferSize, replication, blockSize, progress);
+      } else {
+        out = streamOption.getValue();
+      }
+      Class<?> keyClass = keyClassOption == null ?
+          Object.class : keyClassOption.getValue();
+      Class<?> valueClass = valueClassOption == null ?
+          Object.class : valueClassOption.getValue();
+      Metadata metadata = metadataOption == null ?
+          new Metadata() : metadataOption.getValue();
+      CompressionCodec codec;
+      if (compressionType == CompressionType.NONE) {
+        codec = null;
+      } else {
+        codec = compressionCodecOption == null ?
+            new DefaultCodec() : compressionCodecOption.getValue();
+      }
+      if (codec != null &&
+          (codec instanceof GzipCodec) &&
+          !NativeCodeLoader.isNativeCodeLoaded() &&
+          !ZlibFactory.isNativeZlibLoaded(conf)) {
+        throw new IllegalArgumentException("SequenceFile doesn't work with " +
+                                           "GzipCodec without native-hadoop " +
+                                           "code!");
+      }
+      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
+    }
+
+    /** Create the named file.
+     * @deprecated Use 
+     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
+     *   instead.
+     */
+    @Deprecated
     public Writer(FileSystem fs, Configuration conf, Path name, 
-                  Class keyClass, Class valClass)
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass, null, new Metadata());
+                  Class keyClass, Class valClass) throws IOException {
+      this.compress = CompressionType.NONE;
+      init(conf, fs.create(name), true, keyClass, valClass, null, 
+           new Metadata());
     }
     
-    /** Create the named file with write-progress reporter. */
+    /** Create the named file with write-progress reporter.
+     * @deprecated Use 
+     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
+     *   instead.
+     */
+    @Deprecated
     public Writer(FileSystem fs, Configuration conf, Path name, 
                   Class keyClass, Class valClass,
-                  Progressable progress, Metadata metadata)
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass,
-           fs.getConf().getInt("io.file.buffer.size", 4096),
-           fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-           progress, metadata);
+                  Progressable progress, Metadata metadata) throws IOException {
+      this.compress = CompressionType.NONE;
+      init(conf, fs.create(name, progress), true, keyClass, valClass,
+           null, metadata);
     }
     
-    /** Create the named file with write-progress reporter. */
+    /** Create the named file with write-progress reporter. 
+     * @deprecated Use 
+     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
+     *   instead.
+     */
+    @Deprecated
     public Writer(FileSystem fs, Configuration conf, Path name,
                   Class keyClass, Class valClass,
                   int bufferSize, short replication, long blockSize,
-                  Progressable progress, Metadata metadata)
-      throws IOException {
-      init(name, conf,
+                  Progressable progress, Metadata metadata) throws IOException {
+      this.compress = CompressionType.NONE;
+      init(conf,
            fs.create(name, true, bufferSize, replication, blockSize, progress),
-              keyClass, valClass, false, null, metadata);
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-
-    /** Write to an arbitrary stream using a specified buffer size. */
-    private Writer(Configuration conf, FSDataOutputStream out, 
-                   Class keyClass, Class valClass, Metadata metadata)
-      throws IOException {
-      this.ownOutputStream = false;
-      init(null, conf, out, keyClass, valClass, false, null, metadata);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-
-    /** Write the initial part of file header. */
-    void initializeFileHeader() 
-      throws IOException{
-      out.write(VERSION);
+           true, keyClass, valClass, null, metadata);
     }
 
-    /** Write the final part of file header. */
-    void finalizeFileHeader() 
-      throws IOException{
-      out.write(sync);                       // write the sync bytes
-      out.flush();                           // flush header
-    }
-    
-    boolean isCompressed() { return compress; }
-    boolean isBlockCompressed() { return false; }
+    boolean isCompressed() { return compress != CompressionType.NONE; }
+    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
     
     /** Write and flush the file header. */
-    void writeFileHeader() 
+    private void writeFileHeader() 
       throws IOException {
+      out.write(VERSION);
       Text.writeString(out, keyClass.getName());
       Text.writeString(out, valClass.getName());
       
@@ -902,19 +1046,21 @@ public class SequenceFile {
         Text.writeString(out, (codec.getClass()).getName());
       }
       this.metadata.write(out);
+      out.write(sync);                       // write the sync bytes
+      out.flush();                           // flush header
     }
     
     /** Initialize. */
     @SuppressWarnings("unchecked")
-    void init(Path name, Configuration conf, FSDataOutputStream out,
+    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
               Class keyClass, Class valClass,
-              boolean compress, CompressionCodec codec, Metadata metadata) 
+              CompressionCodec codec, Metadata metadata) 
       throws IOException {
       this.conf = conf;
       this.out = out;
+      this.ownOutputStream = ownStream;
       this.keyClass = keyClass;
       this.valClass = valClass;
-      this.compress = compress;
       this.codec = codec;
       this.metadata = metadata;
       SerializationFactory serializationFactory = new SerializationFactory(conf);
@@ -931,6 +1077,7 @@ public class SequenceFile {
         this.compressedValSerializer = serializationFactory.getSerializer(valClass);
         this.compressedValSerializer.open(deflateOut);
       }
+      writeFileHeader();
     }
     
     /** Returns the class of keys in this file. */
@@ -985,7 +1132,7 @@ public class SequenceFile {
     }
 
     /** Append a key/value pair. */
-    public synchronized void append(Writable key, Writable val)
+    public void append(Writable key, Writable val)
       throws IOException {
       append((Object) key, (Object) val);
     }
@@ -1010,7 +1157,7 @@ public class SequenceFile {
         throw new IOException("negative length keys not allowed: " + key);
 
       // Append the 'value'
-      if (compress) {
+      if (compress == CompressionType.RECORD) {
         deflateFilter.resetState();
         compressedValSerializer.serialize(val);
         deflateOut.flush();
@@ -1059,64 +1206,12 @@ public class SequenceFile {
   /** Write key/compressed-value pairs to a sequence-format file. */
   static class RecordCompressWriter extends Writer {
     
-    /** Create the named file. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-                                Class keyClass, Class valClass, CompressionCodec codec) 
-      throws IOException {
-      this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-                                Class keyClass, Class valClass, CompressionCodec codec,
-                                Progressable progress, Metadata metadata)
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass,
-           fs.getConf().getInt("io.file.buffer.size", 4096),
-           fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
-           progress, metadata);
+    RecordCompressWriter(Configuration conf, 
+                         CompressionType compressionType,
+                         Option... options) throws IOException {
+      super(conf, compressionType, options);
     }
 
-    /** Create the named file with write-progress reporter. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
-                                Class keyClass, Class valClass,
-                                int bufferSize, short replication, long blockSize,
-                                CompressionCodec codec,
-                                Progressable progress, Metadata metadata)
-      throws IOException {
-      super.init(name, conf,
-                 fs.create(name, true, bufferSize, replication, blockSize, progress),
-                 keyClass, valClass, true, codec, metadata);
-
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-
-    /** Create the named file with write-progress reporter. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-                                Class keyClass, Class valClass, CompressionCodec codec,
-                                Progressable progress)
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
-    }
-    
-    /** Write to an arbitrary stream using a specified buffer size. */
-    private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
-                                 Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
-      throws IOException {
-      this.ownOutputStream = false;
-      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-      
-    }
-    
-    boolean isCompressed() { return true; }
-    boolean isBlockCompressed() { return false; }
-
     /** Append a key/value pair. */
     @SuppressWarnings("unchecked")
     public synchronized void append(Object key, Object val)
@@ -1178,79 +1273,20 @@ public class SequenceFile {
     private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
     private DataOutputBuffer valBuffer = new DataOutputBuffer();
 
-    private int compressionBlockSize;
-    
-    /** Create the named file. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-                               Class keyClass, Class valClass, CompressionCodec codec) 
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass,
-           fs.getConf().getInt("io.file.buffer.size", 4096),
-           fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
-           null, new Metadata());
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-                               Class keyClass, Class valClass, CompressionCodec codec,
-                               Progressable progress, Metadata metadata)
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass,
-           fs.getConf().getInt("io.file.buffer.size", 4096),
-           fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
-           progress, metadata);
-    }
-
-    /** Create the named file with write-progress reporter. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
-                               Class keyClass, Class valClass,
-                               int bufferSize, short replication, long blockSize,
-                               CompressionCodec codec,
-                               Progressable progress, Metadata metadata)
-      throws IOException {
-      super.init(name, conf,
-                 fs.create(name, true, bufferSize, replication, blockSize, progress),
-                 keyClass, valClass, true, codec, metadata);
-      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
-
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-
-    /** Create the named file with write-progress reporter. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-                               Class keyClass, Class valClass, CompressionCodec codec,
-                               Progressable progress)
-      throws IOException {
-      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
-    }
+    private final int compressionBlockSize;
     
-    /** Write to an arbitrary stream using a specified buffer size. */
-    private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
-                                Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
-      throws IOException {
-      this.ownOutputStream = false;
-      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
-      init(1000000);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-    
-    boolean isCompressed() { return true; }
-    boolean isBlockCompressed() { return true; }
-
-    /** Initialize */
-    void init(int compressionBlockSize) throws IOException {
-      this.compressionBlockSize = compressionBlockSize;
+    BlockCompressWriter(Configuration conf,
+                        CompressionType compressionType,
+                        Option... options) throws IOException {
+      super(conf, compressionType, options);
+      compressionBlockSize = 
+        conf.getInt("io.seqfile.compress.blocksize", 1000000);
       keySerializer.close();
       keySerializer.open(keyBuffer);
       uncompressedValSerializer.close();
       uncompressedValSerializer.open(valBuffer);
     }
-    
+
     /** Workhorse to check and write out compressed data/lengths */
     private synchronized 
       void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
@@ -1361,10 +1397,15 @@ public class SequenceFile {
     }
   
   } // BlockCompressionWriter
-  
+
+  /** Get the configured buffer size */
+  private static int getBufferSize(Configuration conf) {
+    return conf.getInt("io.file.buffer.size", 4096);
+  }
+
   /** Reads key/value pairs from a sequence-format file. */
   public static class Reader implements java.io.Closeable {
-    private Path file;
+    private String filename;
     private FSDataInputStream in;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
 
@@ -1421,65 +1462,187 @@ public class SequenceFile {
     private Deserializer valDeserializer;
 
     /**
+     * A tag interface for all of the Reader options
+     */
+    public static interface Option {}
+    
+    /**
+     * Create an option to specify the path name of the sequence file.
+     * @param value the path to read
+     * @return a new option
+     */
+    public static Option file(Path value) {
+      return new FileOption(value);
+    }
+    
+    /**
+     * Create an option to specify the stream with the sequence file.
+     * @param value the stream to read.
+     * @return a new option
+     */
+    public static Option stream(FSDataInputStream value) {
+      return new InputStreamOption(value);
+    }
+    
+    /**
+     * Create an option to specify the starting byte to read.
+     * @param value the number of bytes to skip over
+     * @return a new option
+     */
+    public static Option start(long value) {
+      return new StartOption(value);
+    }
+    
+    /**
+     * Create an option to specify the number of bytes to read.
+     * @param value the number of bytes to read
+     * @return a new option
+     */
+    public static Option length(long value) {
+      return new LengthOption(value);
+    }
+    
+    /**
+     * Create an option with the buffer size for reading the given pathname.
+     * @param value the number of bytes to buffer
+     * @return a new option
+     */
+    public static Option bufferSize(int value) {
+      return new BufferSizeOption(value);
+    }
+
+    private static class FileOption extends Options.PathOption 
+                                    implements Option {
+      private FileOption(Path value) {
+        super(value);
+      }
+    }
+    
+    private static class InputStreamOption
+        extends Options.FSDataInputStreamOption 
+        implements Option {
+      private InputStreamOption(FSDataInputStream value) {
+        super(value);
+      }
+    }
+
+    private static class StartOption extends Options.LongOption
+                                     implements Option {
+      private StartOption(long value) {
+        super(value);
+      }
+    }
+
+    private static class LengthOption extends Options.LongOption
+                                      implements Option {
+      private LengthOption(long value) {
+        super(value);
+      }
+    }
+
+    private static class BufferSizeOption extends Options.IntegerOption
+                                      implements Option {
+      private BufferSizeOption(int value) {
+        super(value);
+      }
+    }
+
+    // only used directly
+    private static class OnlyHeaderOption extends Options.BooleanOption 
+                                          implements Option {
+      private OnlyHeaderOption() {
+        super(true);
+      }
+    }
+
+    public Reader(Configuration conf, Option... opts) throws IOException {
+      // Look up the options, these are null if not set
+      FileOption fileOpt = Options.getOption(FileOption.class, opts);
+      InputStreamOption streamOpt = 
+        Options.getOption(InputStreamOption.class, opts);
+      StartOption startOpt = Options.getOption(StartOption.class, opts);
+      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
+      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
+      OnlyHeaderOption headerOnly = 
+        Options.getOption(OnlyHeaderOption.class, opts);
+      // check for consistency
+      if ((fileOpt == null) == (streamOpt == null)) {
+        throw new 
+          IllegalArgumentException("File or stream option must be specified");
+      }
+      if (fileOpt == null && bufOpt != null) {
+        throw new IllegalArgumentException("buffer size can only be set when" +
+                                           " a file is specified.");
+      }
+      // figure out the real values
+      Path filename = null;
+      FSDataInputStream file;
+      long len = lenOpt == null ? Long.MAX_VALUE : lenOpt.getValue();      
+      if (fileOpt != null) {
+        filename = fileOpt.getValue();
+        FileSystem fs = filename.getFileSystem(conf);
+        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
+        file = fs.open(filename, bufSize);
+        len = fs.getFileStatus(filename).getLen();
+      } else {
+        file = streamOpt.getValue();
+      }
+      long start = startOpt == null ? 0 : startOpt.getValue();
+      // really set up
+      initialize(filename, file, start, len, conf, headerOnly != null);
+    }
+
+    /**
      * Construct a reader by opening a file from the given file system.
      * @param fs The file system used to open the file.
      * @param file The file being read.
      * @param conf Configuration
      * @throws IOException
+     * @deprecated Use Reader(Configuration, Option...) instead.
      */
-    public Reader(FileSystem fs, Path file, Configuration conf)
-      throws IOException {
-      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
+    @Deprecated
+    public Reader(FileSystem fs, Path file, 
+                  Configuration conf) throws IOException {
+      initialize(file,
+                 fs.open(file, getBufferSize(conf)), 
+                 0L, fs.getFileStatus(file).getLen(), conf, false);
     }
 
     /**
      * Construct a reader by the given input stream.
      * @param in An input stream.
-     * @param buffersize The buffer size used to read the file.
+     * @param buffersize unused
      * @param start The starting position.
      * @param length The length being read.
      * @param conf Configuration
      * @throws IOException
+     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
      */
+    @Deprecated
     public Reader(FSDataInputStream in, int buffersize,
         long start, long length, Configuration conf) throws IOException {
-      this(null, null, in, buffersize, start, length, conf, false);
-    }
-
-    private Reader(FileSystem fs, Path file, int bufferSize,
-                   Configuration conf, boolean tempReader) throws IOException {
-      this(fs, file, null, bufferSize, 0, fs.getFileStatus(file).getLen(),
-          conf, tempReader);
+      initialize(null, in, start, length, conf, false);
     }
 
-    /**
-     * Private constructor.
-     * @param fs The file system used to open the file.
-     *           It is not used if the given input stream is not null.  
-     * @param file The file being read.
-     * @param in An input stream of the file.  If it is null,
-     *           the file will be opened from the given file system.
-     * @param bufferSize The buffer size used to read the file.
-     * @param start The starting position.
-     * @param length The length being read.
-     * @param conf Configuration
-     * @param tempReader Is this temporary? 
-     * @throws IOException
-     */
-    private Reader(FileSystem fs, Path file, FSDataInputStream in,
-        int bufferSize, long start, long length, Configuration conf,
-        boolean tempReader) throws IOException {
-      if (fs == null && in == null) {
-        throw new IllegalArgumentException("fs == null && in == null");
+    /** Common work of the constructors. */
+    private void initialize(Path filename, FSDataInputStream in,
+                            long start, long length, Configuration conf,
+                            boolean tempReader) throws IOException {
+      if (in == null) {
+        throw new IllegalArgumentException("in == null");
       }
-
-      this.file = file;
-      this.in = in != null? in: openFile(fs, file, bufferSize, length);
+      this.filename = filename == null ? "<unknown>" : filename.toString();
+      this.in = in;
       this.conf = conf;
       boolean succeeded = false;
       try {
         seek(start);
         this.end = this.in.getPos() + length;
+        System.out.println("Setting end to " + end);
+        // if it wrapped around, use the max
+        if (end < length) {
+          end = Long.MAX_VALUE;
+        }
         init(tempReader);
         succeeded = true;
       } finally {
@@ -1695,6 +1858,18 @@ public class SequenceFile {
     
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
+    
+    /**
+     * Get the compression type for this file.
+     * @return the compression type
+     */
+    public CompressionType getCompressionType() {
+      if (decompress) {
+        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
+      } else {
+        return CompressionType.NONE;
+      }
+    }
 
     /** Returns the metadata object of the file */
     public Metadata getMetadata() {
@@ -1984,7 +2159,8 @@ public class SequenceFile {
      * of the value may be computed by calling buffer.getLength() before and
      * after calls to this method. */
     /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
-    public synchronized int next(DataOutputBuffer buffer) throws IOException {
+    @Deprecated
+    synchronized int next(DataOutputBuffer buffer) throws IOException {
       // Unsupported for block-compressed sequence files
       if (blockCompressed) {
         throw new IOException("Unsupported call for block-compressed" +
@@ -2279,7 +2455,7 @@ public class SequenceFile {
 
     /** Returns the name of the file. */
     public String toString() {
-      return file == null? "<unknown>": file.toString();
+      return filename;
     }
 
   }
@@ -2454,8 +2630,7 @@ public class SequenceFile {
         int segments = 0;
         int currentFile = 0;
         boolean atEof = (currentFile >= inFiles.length);
-        boolean isCompressed = false;
-        boolean isBlockCompressed = false;
+        CompressionType compressionType;
         CompressionCodec codec = null;
         segmentLengths.clear();
         if (atEof) {
@@ -2464,8 +2639,7 @@ public class SequenceFile {
         
         // Initialize
         in = new Reader(fs, inFiles[currentFile], conf);
-        isCompressed = in.isCompressed();
-        isBlockCompressed = in.isBlockCompressed();
+        compressionType = in.getCompressionType();
         codec = in.getCompressionCodec();
         
         for (int i=0; i < rawValues.length; ++i) {
@@ -2526,7 +2700,7 @@ public class SequenceFile {
           if (progressable != null) {
             progressable.progress();
           }
-          flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, 
+          flush(count, bytesProcessed, compressionType, codec, 
                 segments==0 && atEof);
           segments++;
         }
@@ -2569,9 +2743,10 @@ public class SequenceFile {
         return result;
       }
 
-      private void flush(int count, int bytesProcessed, boolean isCompressed, 
-                         boolean isBlockCompressed, CompressionCodec codec, boolean done) 
-        throws IOException {
+      private void flush(int count, int bytesProcessed, 
+                         CompressionType compressionType, 
+                         CompressionCodec codec, 
+                         boolean done) throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile.suffix(".0");
           out = fs.create(outName);
@@ -2581,9 +2756,14 @@ public class SequenceFile {
         }
 
         long segmentStart = out.getPos();
-        Writer writer = createWriter(conf, out, keyClass, valClass, 
-                                     isCompressed, isBlockCompressed, codec, 
-                                     done ? metadata : new Metadata());
+        Writer writer = createWriter(conf, 
+                                     Writer.stream(out), 
+                                     Writer.keyClass(keyClass), 
+                                     Writer.valueClass(valClass), 
+                                     Writer.compressionType(compressionType), 
+                                     Writer.compressionCodec(codec), 
+                                     Writer.metadata(done ? metadata : 
+                                       new Metadata()));
         
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
@@ -2751,19 +2931,21 @@ public class SequenceFile {
      * @throws IOException
      */
     public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
-                                      Progressable prog) 
-    throws IOException {
-      FileSystem srcFileSys = inputFile.getFileSystem(conf);
-      Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
-      boolean compress = reader.isCompressed();
-      boolean blockCompress = reader.isBlockCompressed();
+                                      Progressable prog) throws IOException {
+      Reader reader = new Reader(conf,
+                                 Reader.file(inputFile),
+                                 new Reader.OnlyHeaderOption());
+      CompressionType compress = reader.getCompressionType();
       CompressionCodec codec = reader.getCompressionCodec();
       reader.close();
 
-      Writer writer = createWriter(outputFile.getFileSystem(conf), conf, 
-                                   outputFile, keyClass, valClass, compress, 
-                                   blockCompress, codec, prog,
-                                   new Metadata());
+      Writer writer = createWriter(conf, 
+                                   Writer.file(outputFile), 
+                                   Writer.keyClass(keyClass), 
+                                   Writer.valueClass(valClass), 
+                                   Writer.compressionType(compress), 
+                                   Writer.compressionCodec(codec), 
+                                   Writer.progressable(prog));
       return writer;
     }
 
@@ -3164,13 +3346,15 @@ public class SequenceFile {
        */
       public boolean nextRawKey() throws IOException {
         if (in == null) {
-          int bufferSize = conf.getInt("io.file.buffer.size", 4096); 
+          int bufferSize = getBufferSize(conf); 
           if (fs.getUri().getScheme().startsWith("ramfs")) {
             bufferSize = conf.getInt("io.bytes.per.checksum", 512);
           }
-          Reader reader = new Reader(fs, segmentPathName, null, 
-                                     bufferSize, segmentOffset, 
-                                     segmentLength, conf, false);
+          Reader reader = new Reader(conf,
+                                     Reader.file(segmentPathName), 
+                                     Reader.bufferSize(bufferSize),
+                                     Reader.start(segmentOffset), 
+                                     Reader.length(segmentLength));
         
           //sometimes we ignore syncs especially for temp merge files
           if (ignoreSync) reader.ignoreSync();

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/SetFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/SetFile.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/SetFile.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/SetFile.java Thu Sep 30 02:59:32 2010
@@ -57,7 +57,10 @@ public class SetFile extends MapFile {
     public Writer(Configuration conf, FileSystem fs, String dirName,
                   WritableComparator comparator,
                   SequenceFile.CompressionType compress) throws IOException {
-      super(conf, fs, dirName, comparator, NullWritable.class, compress);
+      super(conf, new Path(dirName), 
+            comparator(comparator), 
+            keyClass(NullWritable.class), 
+            compressionType(compress));
     }
 
     /** Append a key to a set.  The key must be strictly greater than the
@@ -78,7 +81,7 @@ public class SetFile extends MapFile {
     /** Construct a set reader for the named set using the named comparator.*/
     public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
       throws IOException {
-      super(fs, dirName, comparator, conf);
+      super(new Path(dirName), conf, comparator(comparator));
     }
 
     // javadoc inherited

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java?rev=1002937&r1=1002936&r2=1002937&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java Thu Sep 30 02:59:32 2010
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;



Mime
View raw message