hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1042107 [3/6] - in /hadoop/common/branches/HADOOP-6685: ./ ivy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/file/tfile/ src/java/org/apache/hadoop/io/serial/ src/java/org/apache/had...
Date Sat, 04 Dec 2010 07:13:12 GMT
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java Sat Dec  4 07:13:10 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.*;
 public class SetFile extends MapFile {
 
   protected SetFile() {}                            // no public ctor
+  private static final NullWritable NULL_WRITABLE = NullWritable.get();
 
   /** 
    * Write a new set file.
@@ -41,8 +42,10 @@ public class SetFile extends MapFile {
      *  @deprecated pass a Configuration too
      */
     public Writer(FileSystem fs, String dirName,
-	Class<? extends WritableComparable> keyClass) throws IOException {
-      super(new Configuration(), fs, dirName, keyClass, NullWritable.class);
+                	Class<? extends WritableComparable> keyClass
+                	) throws IOException {
+      super(new Configuration(), new Path(dirName),
+            keyClass(keyClass), valueClass(NullWritable.class));
     }
 
     /** Create a set naming the element class and compression type. */
@@ -59,6 +62,7 @@ public class SetFile extends MapFile {
                   SequenceFile.CompressionType compress) throws IOException {
       super(conf, new Path(dirName), 
             comparator(comparator), 
+            keyClass(comparator.getKeyClass()),
             valueClass(NullWritable.class), 
             compression(compress));
     }
@@ -66,7 +70,7 @@ public class SetFile extends MapFile {
     /** Append a key to a set.  The key must be strictly greater than the
      * previous key added to the set. */
     public void append(WritableComparable key) throws IOException{
-      append(key, NullWritable.get());
+      append(key, NULL_WRITABLE);
     }
   }
 
@@ -94,7 +98,7 @@ public class SetFile extends MapFile {
      * true if such a key exists and false when at the end of the set. */
     public boolean next(WritableComparable key)
       throws IOException {
-      return next(key, NullWritable.get());
+      return next(key, NULL_WRITABLE);
     }
 
     /** Read the matching key from a set into <code>key</code>.

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java Sat Dec  4 07:13:10 2010
@@ -198,7 +198,6 @@ final class BCFile {
     public class BlockAppender extends DataOutputStream {
       private final BlockRegister blockRegister;
       private final WBlockState wBlkState;
-      @SuppressWarnings("hiding")
       private boolean closed = false;
 
       /**
@@ -282,15 +281,32 @@ final class BCFile {
      * @throws IOException
      * @see Compression#getSupportedAlgorithms
      */
+    @Deprecated
     public Writer(FSDataOutputStream fout, String compressionName,
         Configuration conf) throws IOException {
+      this(fout, Compression.getCompressionAlgorithmByName(compressionName),
+           conf);
+    }
+    
+    /**
+     * Constructor
+     * 
+     * @param fout
+     *          FS output stream.
+     * @param compression
+     *          The compression algorithm, which will be used for all
+     *          data blocks.
+     * @throws IOException
+     */
+    public Writer(FSDataOutputStream fout, Algorithm compression,
+        Configuration conf) throws IOException {
       if (fout.getPos() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
 
       this.out = fout;
       this.conf = conf;
-      dataIndex = new DataIndex(compressionName);
+      dataIndex = new DataIndex(compression);
       metaIndex = new MetaIndex();
       fsOutputBuffer = new BytesWritable();
       Magic.write(fout);
@@ -651,6 +667,14 @@ final class BCFile {
     }
 
     /**
+     * Get the default compression algorithm.
+     * @return the default compression algorithm
+     */
+    public Algorithm getDefaultCompression() {
+      return dataIndex.getDefaultCompressionAlgorithm();
+    }
+
+    /**
      * Get version of BCFile file being read.
      * 
      * @return version of BCFile file being read.
@@ -870,12 +894,16 @@ final class BCFile {
       }
     }
 
+    public DataIndex(Algorithm defaultCompression) {
+      this.defaultCompressionAlgorithm = defaultCompression;
+      listRegions = new ArrayList<BlockRegion>();
+    }
+
     // for write
+    @Deprecated
     public DataIndex(String defaultCompressionAlgorithmName) {
-      this.defaultCompressionAlgorithm =
-          Compression
-              .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
-      listRegions = new ArrayList<BlockRegion>();
+      this(Compression
+              .getCompressionAlgorithmByName(defaultCompressionAlgorithmName));
     }
 
     public Algorithm getDefaultCompressionAlgorithm() {

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java Sat Dec  4 07:13:10 2010
@@ -19,8 +19,7 @@ package org.apache.hadoop.io.file.tfile;
 import java.io.Serializable;
 import java.util.Comparator;
 
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serial.RawComparator;
 
 class CompareUtils {
   /**
@@ -36,9 +35,9 @@ class CompareUtils {
    */
   public static final class BytesComparator implements
       Comparator<RawComparable> {
-    private RawComparator<Object> cmp;
+    private RawComparator cmp;
 
-    public BytesComparator(RawComparator<Object> cmp) {
+    public BytesComparator(RawComparator cmp) {
       this.cmp = cmp;
     }
 
@@ -73,7 +72,9 @@ class CompareUtils {
     }
   }
 
-  public static final class ScalarComparator implements Comparator<Scalar>, Serializable {
+  @SuppressWarnings("serial")
+  public static final class ScalarComparator 
+                      implements Comparator<Scalar>, Serializable {
     @Override
     public int compare(Scalar o1, Scalar o2) {
       long diff = o1.magnitude() - o2.magnitude();
@@ -83,16 +84,4 @@ class CompareUtils {
     }
   }
 
-  public static final class MemcmpRawComparator implements
-      RawComparator<Object>, Serializable {
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
-    }
-
-    @Override
-    public int compare(Object o1, Object o2) {
-      throw new RuntimeException("Object comparison not supported");
-    }
-  }
 }

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java Sat Dec  4 07:13:10 2010
@@ -39,7 +39,7 @@ import org.apache.hadoop.util.Reflection
 /**
  * Compression related stuff.
  */
-final class Compression {
+final public class Compression {
   static final Log LOG = LogFactory.getLog(Compression.class);
 
   /**
@@ -71,7 +71,7 @@ final class Compression {
   /**
    * Compression algorithms.
    */
-  static enum Algorithm {
+  public static enum Algorithm {
     LZO(TFile.COMPRESSION_LZO) {
       private transient boolean checked = false;
       private static final String defaultClazz =
@@ -99,7 +99,7 @@ final class Compression {
       }
 
       @Override
-      CompressionCodec getCodec() throws IOException {
+      synchronized CompressionCodec getCodec() throws IOException {
         if (!isSupported()) {
           throw new IOException(
               "LZO codec class not specified. Did you forget to set property "
@@ -160,7 +160,7 @@ final class Compression {
       private transient DefaultCodec codec;
 
       @Override
-      CompressionCodec getCodec() {
+      synchronized CompressionCodec getCodec() {
         if (codec == null) {
           codec = new DefaultCodec();
           codec.setConf(conf);

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java Sat Dec  4 07:13:10 2010
@@ -22,7 +22,7 @@ import java.util.Comparator;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serial.RawComparator;
 
 /**
  * Interface for objects that can be compared through {@link RawComparator}.

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java Sat Dec  4 07:13:10 2010
@@ -41,16 +41,18 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
 import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
 import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
 import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
-import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
-import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+import org.apache.hadoop.io.serial.lib.MemcmpRawComparator;
 import org.apache.hadoop.io.file.tfile.Utils.Version;
-import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.lib.DeserializationRawComparator;
+import org.apache.hadoop.util.Options;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * A TFile is a container of key-value pairs. Both keys and values are type-less
@@ -165,16 +167,56 @@ public class TFile {
   public static final String COMPARATOR_MEMCMP = "memcmp";
   /** comparator prefix: java class */
   public static final String COMPARATOR_JCLASS = "jclass:";
+  /** user-managed comparator */
+  public static final String COMPARATOR_USER_MANAGED = "user";
 
   /**
-   * Make a raw comparator from a string name.
-   * 
-   * @param name
-   *          Comparator name
-   * @return A RawComparable comparator.
+   * A constant that is used to represent memcmp sort order in the tfile.
    */
-  static public Comparator<RawComparable> makeComparator(String name) {
-    return TFileMeta.makeComparator(name);
+  public static final RawComparator MEMCMP = new MemcmpRawComparator();
+
+  /**
+   * The kinds of comparators that tfile supports.
+   */
+  public static enum ComparatorKind {
+    NONE(""), MEMCMP(COMPARATOR_MEMCMP), USER_MANAGED(COMPARATOR_USER_MANAGED);
+    
+    private String name;
+
+    ComparatorKind(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+
+    public static ComparatorKind fromString(String val) {
+      if (val == null || val.length() == 0) {
+        return NONE;
+      }
+      for (ComparatorKind kind: values()) {
+        if (kind.name.equals(val)) {
+          return kind;
+        }
+      }
+      if (val.startsWith(COMPARATOR_JCLASS)) {
+        return USER_MANAGED;
+      }
+      throw new IllegalArgumentException("Comparator kind " + val + 
+                                         " unknown.");
+    }
+
+    static ComparatorKind fromComparator(RawComparator comparator) {
+      if (comparator == null) {
+        return NONE;
+      } else if (comparator.getClass() == MemcmpRawComparator.class){
+        return MEMCMP;
+      } else {
+        return USER_MANAGED;
+      }
+    }
   }
 
   // Prevent the instantiation of TFiles
@@ -242,9 +284,10 @@ public class TFile {
     State state = State.READY;
     Configuration conf;
     long errorCount = 0;
+    private final RawComparator comparator;
 
     /**
-     * Constructor
+     * Constructor for a TFile Writer.
      * 
      * @param fsdos
      *          output stream for writing. Must be at position 0.
@@ -255,7 +298,7 @@ public class TFile {
      * @param compressName
      *          Name of the compression algorithm. Must be one of the strings
      *          returned by {@link TFile#getSupportedCompressionAlgorithms()}.
-     * @param comparator
+     * @param comparatorName
      *          Leave comparator as null or empty string if TFile is not sorted.
      *          Otherwise, provide the string name for the comparison algorithm
      *          for keys. Two kinds of comparators are supported.
@@ -269,7 +312,7 @@ public class TFile {
      *          constructed through the default constructor (with no
      *          parameters). Parameterized RawComparators such as
      *          {@link WritableComparator} or
-     *          {@link JavaSerializationComparator} may not be directly used.
+     *          {@link DeserializationRawComparator} may not be directly used.
      *          One should write a wrapper class that inherits from such classes
      *          and use its default constructor to perform proper
      *          initialization.
@@ -277,15 +320,156 @@ public class TFile {
      * @param conf
      *          The configuration object.
      * @throws IOException
+     * @deprecated Use Writer(Configuration,Option...) instead.
      */
     public Writer(FSDataOutputStream fsdos, int minBlockSize,
-        String compressName, String comparator, Configuration conf)
-        throws IOException {
-      sizeMinBlock = minBlockSize;
-      tfileMeta = new TFileMeta(comparator);
-      tfileIndex = new TFileIndex(tfileMeta.getComparator());
+                  String compressName, String comparatorName, 
+                  Configuration conf) throws IOException {
+      this(conf, stream(fsdos), blockSize(minBlockSize),
+           compress(Compression.getCompressionAlgorithmByName(compressName)),
+           comparatorName(comparatorName));
+    }
+
+    /**
+     * Marker class for all of the Writer options.
+     */
+    public static interface Option {}
+
+    /**
+     * Create an option with a output stream.
+     * @param value output stream for writing. Must be at position 0.
+     * @return the new option
+     */
+    public static Option stream(FSDataOutputStream value) {
+      return new StreamOption(value);
+    }
+
+    /**
+     * Create an option for the compression algorithm.
+     * @param value the compression algorithm to use.
+     * @return the new option
+     */
+    public static Option compress(Algorithm value) {
+      return new CompressOption(value);
+    }
+    
+    /**
+     * Create an option for the minimum block size.
+     * @param value the minimum number of bytes that a compression block will
+     *        contain.
+     * @return the new option
+     */
+    public static Option blockSize(int value) {
+      return new BlockSizeOption(value);
+    }
+    
+    /**
+     * Create an option for specifying the comparator.
+     * @param value the comparator for indexing and searching the file
+     * @return the new option
+     */
+    public static Option comparator(RawComparator value) {
+      return new ComparatorOption(value);
+    }
+
+    /**
+     * Create an option for the comparator from a string. This is intended to
+     * support old clients that specified the comparator name and expected 
+     * the reader to be able to read it.
+     * @param value 
+     * @return the new option
+     */
+    public static Option comparatorName(String value) {
+      return new ComparatorNameOption(value);
+    }
+
+    private static class StreamOption extends Options.FSDataOutputStreamOption
+                                      implements Option {
+      StreamOption(FSDataOutputStream value) {
+        super(value);
+      }
+    }
+ 
+    private static class CompressOption implements Option {
+      private Algorithm value;
+      CompressOption(Algorithm value) {
+        this.value = value;
+      }
+      Algorithm getValue() {
+        return value;
+      }
+    }
+
+    private static class BlockSizeOption extends Options.IntegerOption 
+                                         implements Option {
+      BlockSizeOption(int value) {
+        super(value);
+      }
+    }
+    
+    private static class ComparatorOption implements Option {
+      private RawComparator value;
+      ComparatorOption(RawComparator value) {
+        this.value = value;
+      }
+      RawComparator getValue() {
+        return value;
+      }
+    }
+
+    private static class ComparatorNameOption extends Options.StringOption 
+                                              implements Option {
+      ComparatorNameOption(String value) {
+        super(value);
+      }
+    }
 
-      writerBCF = new BCFile.Writer(fsdos, compressName, conf);
+    /**
+     * Constructor
+     * 
+     * @param conf
+     *          The configuration object.
+     * @param options
+     *          the options for controlling the file.
+     * @throws IOException
+     */
+    public Writer(Configuration conf, Option... options) throws IOException {
+      BlockSizeOption blockSize = Options.getOption(BlockSizeOption.class, 
+                                                    options);
+      ComparatorOption comparatorOpt = Options.getOption(ComparatorOption.class, 
+                                                         options);
+      ComparatorNameOption comparatorNameOpt = 
+        Options.getOption(ComparatorNameOption.class, options);
+      CompressOption compressOpt = Options.getOption(CompressOption.class,
+                                                     options);
+      StreamOption stream = Options.getOption(StreamOption.class, options);
+      
+      if (stream == null) {
+        throw new IllegalArgumentException("Must provide a stream");
+      }
+      if (comparatorOpt != null && comparatorNameOpt != null) {
+        throw new IllegalArgumentException("Can only provide one comparator" +
+                                           " option");
+      }
+
+      sizeMinBlock = blockSize == null ? 1048576 : blockSize.getValue();
+      String comparatorName;
+      if (comparatorOpt != null) {
+        comparator = comparatorOpt.getValue();
+        comparatorName = ComparatorKind.fromComparator(comparator).toString();
+      } else if (comparatorNameOpt != null) {
+        comparatorName = comparatorNameOpt.getValue();
+        comparator = makeComparator(comparatorName);
+      } else {
+        comparator = null;
+        comparatorName = null;
+      }
+      tfileMeta = new TFileMeta(comparatorName);
+      tfileIndex = new TFileIndex(comparator);
+      Algorithm compress = 
+        compressOpt == null ? Algorithm.NONE : compressOpt.getValue();
+
+      writerBCF = new BCFile.Writer(stream.getValue(), compress, conf);
       currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
       lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
       this.conf = conf;
@@ -455,8 +639,8 @@ public class TFile {
           if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
             byte[] lastKey = lastKeyBufferOS.getBuffer();
             int lastLen = lastKeyBufferOS.size();
-            if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
-                lastLen) < 0) {
+            // check sort order unless this is the first key
+            if (comparator.compare(key, 0, len, lastKey, 0, lastLen) < 0) {
               throw new IOException("Keys are not added in sorted order");
             }
           }
@@ -687,7 +871,7 @@ public class TFile {
     // TFile index, it is loaded lazily.
     TFileIndex tfileIndex = null;
     final TFileMeta tfileMeta;
-    final BytesComparator comparator;
+    private RawComparator comparator = null;
 
     // global begin and end locations.
     private final Location begin;
@@ -784,6 +968,17 @@ public class TFile {
         if (recordIndex != other.recordIndex) return false;
         return true;
       }
+      
+      @Override
+      public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("Location(");
+        builder.append(blockIndex);
+        builder.append(", ");
+        builder.append(recordIndex);
+        builder.append(")");
+        return builder.toString();
+      }
     }
 
     /**
@@ -798,8 +993,8 @@ public class TFile {
      * @param conf
      * @throws IOException
      */
-    public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
-        throws IOException {
+    public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf
+                  ) throws IOException {
       readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
 
       // first, read TFile meta
@@ -809,14 +1004,30 @@ public class TFile {
       } finally {
         brMeta.close();
       }
+      comparator = makeComparator(tfileMeta.getComparatorName());
 
-      comparator = tfileMeta.getComparator();
       // Set begin and end locations.
       begin = new Location(0, 0);
       end = new Location(readerBCF.getBlockCount(), 0);
     }
 
     /**
+     * Set the comparator for reading this file. May only be called once for
+     * each Reader.
+     * @param comparator a comparator for this file.
+     */
+    public void setComparator(RawComparator comparator) {
+      ComparatorKind kind = ComparatorKind.fromComparator(comparator);
+      if (kind != tfileMeta.getComparatorKind()) {
+        throw new IllegalArgumentException("Illegal comparator for this tfile: "
+                                           + kind +
+                                           " instead of " + 
+                                           tfileMeta.getComparatorKind());        
+      }
+      this.comparator = comparator;
+    }
+
+    /**
      * Close the reader. The state of the Reader object is undefined after
      * close. Calling close() for multiple times has no effect.
      */
@@ -844,6 +1055,14 @@ public class TFile {
     }
 
     /**
+     * Get the version of the tfile format.
+     * @return the version of the file
+     */
+    public Version getFileVersion() {
+      return tfileMeta.getVersion();
+    }
+
+    /**
      * Get the string representation of the comparator.
      * 
      * @return If the TFile is not sorted by keys, an empty string will be
@@ -851,7 +1070,15 @@ public class TFile {
      *         provided during the TFile creation time will be returned.
      */
     public String getComparatorName() {
-      return tfileMeta.getComparatorString();
+      return tfileMeta.getComparatorKind().toString();
+    }
+
+    /**
+     * Get the compression algorithm.
+     * @return the compression algorithm used
+     */
+    public Algorithm getCompression() {
+      return readerBCF.getDefaultCompression();
     }
 
     /**
@@ -882,8 +1109,7 @@ public class TFile {
         BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
         try {
           tfileIndex =
-              new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
-                  .getComparator());
+              new TFileIndex(readerBCF.getBlockCount(), brIndex, comparator);
         } finally {
           brIndex.close();
         }
@@ -947,7 +1173,7 @@ public class TFile {
      * 
      * @return a Comparator that can compare RawComparable's.
      */
-    public Comparator<RawComparable> getComparator() {
+    public RawComparator getComparator() {
       return comparator;
     }
 
@@ -1006,6 +1232,10 @@ public class TFile {
       if (!isSorted()) {
         throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
       }
+      if (comparator == null) {
+        throw new 
+           RuntimeException("Cannot compare keys until comparator is set");
+      }
       return comparator.compare(a, o1, l1, b, o2, l2);
     }
 
@@ -1013,7 +1243,12 @@ public class TFile {
       if (!isSorted()) {
         throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
       }
-      return comparator.compare(a, b);
+      if (comparator == null) {
+        throw new 
+           RuntimeException("Cannot compare keys until comparator is set");
+      }
+      return comparator.compare(a.buffer(), a.offset(), a.size(), 
+                                b.buffer(), b.offset(), b.size());
     }
 
     /**
@@ -1028,7 +1263,9 @@ public class TFile {
      */
     Location getLocationNear(long offset) {
       int blockIndex = readerBCF.getBlockIndexNear(offset);
-      if (blockIndex == -1) return end;
+      if (blockIndex == -1) {
+        return end;
+      }
       return new Location(blockIndex, 0);
     }
 
@@ -1089,7 +1326,8 @@ public class TFile {
      *         contains zero key-value pairs even if length is positive.
      * @throws IOException
      */
-    public Scanner createScannerByByteRange(long offset, long length) throws IOException {
+    public Scanner createScannerByByteRange(long offset,
+                                            long length) throws IOException {
       return new Scanner(this, offset, offset + length);
     }
 
@@ -2032,20 +2270,20 @@ public class TFile {
   /**
    * Data structure representing "TFile.meta" meta block.
    */
-  static final class TFileMeta {
+  private static final class TFileMeta {
     final static String BLOCK_NAME = "TFile.meta";
-    final Version version;
+    private final Version version;
     private long recordCount;
-    private final String strComparator;
-    private final BytesComparator comparator;
+    private final ComparatorKind comparatorKind;
+    private final String comparatorName;
 
     // ctor for writes
-    public TFileMeta(String comparator) {
+    public TFileMeta(String comparatorName) {
       // set fileVersion to API version when we create it.
       version = TFile.API_VERSION;
       recordCount = 0;
-      strComparator = (comparator == null) ? "" : comparator;
-      this.comparator = makeComparator(strComparator);
+      this.comparatorKind = ComparatorKind.fromString(comparatorName);
+      this.comparatorName = comparatorName;
     }
 
     // ctor for reads
@@ -2055,42 +2293,14 @@ public class TFile {
         throw new RuntimeException("Incompatible TFile fileVersion.");
       }
       recordCount = Utils.readVLong(in);
-      strComparator = Utils.readString(in);
-      comparator = makeComparator(strComparator);
-    }
-
-    @SuppressWarnings("unchecked")
-    static BytesComparator makeComparator(String comparator) {
-      if (comparator.length() == 0) {
-        // unsorted keys
-        return null;
-      }
-      if (comparator.equals(COMPARATOR_MEMCMP)) {
-        // default comparator
-        return new BytesComparator(new MemcmpRawComparator());
-      } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
-        String compClassName =
-            comparator.substring(COMPARATOR_JCLASS.length()).trim();
-        try {
-          Class compClass = Class.forName(compClassName);
-          // use its default ctor to create an instance
-          return new BytesComparator((RawComparator<Object>) compClass
-              .newInstance());
-        } catch (Exception e) {
-          throw new IllegalArgumentException(
-              "Failed to instantiate comparator: " + comparator + "("
-                  + e.toString() + ")");
-        }
-      } else {
-        throw new IllegalArgumentException("Unsupported comparator: "
-            + comparator);
-      }
+      comparatorName = Utils.readString(in);
+      comparatorKind = ComparatorKind.fromString(comparatorName);
     }
 
     public void write(DataOutput out) throws IOException {
       TFile.API_VERSION.write(out);
       Utils.writeVLong(out, recordCount);
-      Utils.writeString(out, strComparator);
+      Utils.writeString(out, comparatorName);
     }
 
     public long getRecordCount() {
@@ -2102,20 +2312,20 @@ public class TFile {
     }
 
     public boolean isSorted() {
-      return !strComparator.equals("");
-    }
-
-    public String getComparatorString() {
-      return strComparator;
+      return comparatorKind != ComparatorKind.NONE;
     }
 
-    public BytesComparator getComparator() {
-      return comparator;
+    public ComparatorKind getComparatorKind() {
+      return comparatorKind;
     }
 
     public Version getVersion() {
       return version;
     }
+    
+    public String getComparatorName() {
+      return comparatorName;
+    }
   } // END: class MetaTFileMeta
 
   /**
@@ -2126,7 +2336,7 @@ public class TFile {
     private ByteArray firstKey;
     private final ArrayList<TFileIndexEntry> index;
     private final ArrayList<Long> recordNumIndex;
-    private final BytesComparator comparator;
+    private final RawComparator comparator;
     private long sum = 0;
     
     /**
@@ -2134,7 +2344,7 @@ public class TFile {
      * 
      * @throws IOException
      */
-    public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
+    public TFileIndex(int entryCount, DataInput in, RawComparator comparator)
         throws IOException {
       index = new ArrayList<TFileIndexEntry>(entryCount);
       recordNumIndex = new ArrayList<Long>(entryCount);
@@ -2217,7 +2427,7 @@ public class TFile {
     /**
      * For writing to file.
      */
-    public TFileIndex(BytesComparator comparator) {
+    public TFileIndex(RawComparator comparator) {
       index = new ArrayList<TFileIndexEntry>();
       recordNumIndex = new ArrayList<Long>();
       this.comparator = comparator;
@@ -2332,6 +2542,58 @@ public class TFile {
   }
 
   /**
+   * Make a raw comparator from a string name.
+   * 
+   * @param name
+   *          Comparator name
+   * @return A RawComparable comparator.
+   */
+  static RawComparator makeComparator(String comparator) {
+    if (comparator == null || comparator.length() == 0) {
+      // unsorted keys
+      return null;
+    }
+    if (comparator.equals(COMPARATOR_MEMCMP)) {
+      // default comparator
+      return MEMCMP;
+    } else if (comparator.equals(COMPARATOR_USER_MANAGED)) {
+      // the user needs to set it explicitly
+      return null;
+    } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
+      // if it is a jclass string, we try to create it for them
+      // this only happens in old tfiles
+      String compClassName =
+        comparator.substring(COMPARATOR_JCLASS.length()).trim();
+      try {
+        Class<?> compClass = Class.forName(compClassName);
+        // use its default ctor to create an instance
+        return (RawComparator) ReflectionUtils.newInstance(compClass, null);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IllegalArgumentException("Comparator class " + compClassName 
+                                           + " not found.");
+      }
+    } else {
+      throw new IllegalArgumentException("Unsupported comparator: "
+                                         + comparator);
+    }
+  }
+
+  /**
+   * Create a stringification of a given comparator
+   * @param comparator the comparator to stringify, may be null
+   * @return the string identifying this comparator
+   */
+  static String stringifyComparator(RawComparator comparator) {
+    if (comparator == null) {
+      return "";
+    } else if (comparator.getClass() == MemcmpRawComparator.class){
+      return COMPARATOR_MEMCMP;
+    } else {
+      return COMPARATOR_USER_MANAGED;
+    }
+  }
+
+  /**
    * Dumping the TFile information.
    * 
    * @param args

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java Sat Dec  4 07:13:10 2010
@@ -106,7 +106,7 @@ class TFileDumper {
       int blockCnt = reader.readerBCF.getBlockCount();
       int metaBlkCnt = reader.readerBCF.metaIndex.index.size();
       properties.put("BCFile Version", reader.readerBCF.version.toString());
-      properties.put("TFile Version", reader.tfileMeta.version.toString());
+      properties.put("TFile Version", reader.getFileVersion().toString());
       properties.put("File Length", Long.toString(length));
       properties.put("Data Compression", reader.readerBCF
           .getDefaultCompressionName());

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java Sat Dec  4 07:13:10 2010
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serial.RawComparator;
 
 /**
  * Supporting Utility classes used by TFile, and shared by users of TFile.
@@ -414,15 +415,16 @@ public final class Utils {
    * @return The index to the desired element if it exists; or list.size()
    *         otherwise.
    */
-  public static <T> int lowerBound(List<? extends T> list, T key,
-      Comparator<? super T> cmp) {
+  public static <T extends RawComparable> 
+  int lowerBound(List<? extends T> list, T key, RawComparator cmp) {
     int low = 0;
     int high = list.size();
 
     while (low < high) {
       int mid = (low + high) >>> 1;
       T midVal = list.get(mid);
-      int ret = cmp.compare(midVal, key);
+      int ret = cmp.compare(midVal.buffer(), midVal.offset(), midVal.size(), 
+                            key.buffer(), key.offset(), key.size());
       if (ret < 0)
         low = mid + 1;
       else high = mid;
@@ -445,15 +447,16 @@ public final class Utils {
    * @return The index to the desired element if it exists; or list.size()
    *         otherwise.
    */
-  public static <T> int upperBound(List<? extends T> list, T key,
-      Comparator<? super T> cmp) {
+  public static <T extends RawComparable> 
+  int upperBound(List<? extends T> list, T key, RawComparator cmp) {
     int low = 0;
     int high = list.size();
 
     while (low < high) {
       int mid = (low + high) >>> 1;
       T midVal = list.get(mid);
-      int ret = cmp.compare(midVal, key);
+      int ret = cmp.compare(midVal.buffer(), midVal.offset(), midVal.size(),
+                            key.buffer(), key.offset(), key.size());
       if (ret <= 0)
         low = mid + 1;
       else high = mid;
@@ -491,6 +494,35 @@ public final class Utils {
   }
 
   /**
+   * Lower bound binary search. Find the index to the first element in the list
+   * that compares greater than or equal to key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int lowerBound(List<? extends T> list,
+                                   T key, Comparator<? super T> cmp) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      T midVal = list.get(mid);
+      int ret = cmp.compare(midVal, key);
+      if (ret < 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
    * Upper bound binary search. Find the index to the first element in the list
    * that compares greater than the input key.
    * 

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A compare function that compares two sets of bytes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface RawComparator {
+
+  /**
+   * Compare the two serialized keys. This must be stable, so:
+   * compare(b1,s1,l1,b2,s2,l2) = -compare(b2,s2,l2,b1,s1,l2) for all buffers.
+   * @param b1 the left data buffer to compare
+   * @param s1 the first index in b1 to compare
+   * @param l1 the number of bytes in b1 to compare
+   * @param b2 the right data buffer to compare
+   * @param s2 the first index in b2 to compare
+   * @param l2 the number of bytes in b2 to compare
+   * @return negative if b1 is less than b2, 0 if they are equal, positive if
+   *         b1 is greater than b2.
+   */
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
+
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The primary interface to provide serialization.
+ * @param <T> the parent type that it will serialize
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class Serialization<T> implements Cloneable {
+  
+  /**
+   * Serialize the given object to the OutputStream.
+   * @param stream the stream to serialize to
+   * @param object the object to serialize
+   * @throws IOException if the serialization fails
+   */
+  public abstract void serialize(OutputStream stream, 
+                                 T object) throws IOException;
+
+  /**
+   * Deserialize the given object from the InputStream.
+   * @param stream the stream to deserialize from
+   * @param reusableObject an object (or null) that may be reused by the 
+   *        serializer
+   * @param conf the user's configuration
+   * @return the object that was created or reused with the data in it
+   * @throws IOException if the deserialization fails
+   */
+  public abstract T deserialize(InputStream stream,
+                                T reusableObject,
+                                Configuration conf) throws IOException;
+  
+  /**
+   * Get the default raw comparator for the given serializer
+   * @return a comparator that will compare bytes
+   */
+  public abstract RawComparator getRawComparator();
+  
+  /**
+   * Serialize the serializer's configuration to the output stream.
+   * @param out the stream to serialize to
+   * @throws IOException if the serialization fails
+   */
+  public abstract void serializeSelf(OutputStream out) throws IOException;
+  
+  /**
+   * Modify the serialization's configuration to reflect the contents of the
+   * input stream.
+   * @param in the stream to read from
+   * @param conf the configuration
+   * @throws IOException if the deserialization fails
+   */
+  public abstract void deserializeSelf(InputStream in,
+                                       Configuration conf) throws IOException;
+  
+  /**
+   * Generate the state of the serialization in a human-friendly string.
+   * @return the textual representation of the serialization state
+   */
+  @Override
+  public abstract String toString();
+
+  /**
+   * Restore the state of the serialization from a human-friendly string.
+   * @param metadata the string that was generated by toString
+   * @throws IOException
+   */
+  public abstract void fromString(String metadata) throws IOException;
+  
+  /**
+   * Get the name for this kind of serialization, which must be unique. This 
+   * name is used to identify the serialization that was used to write a
+   * particular file.
+   * @return the unique name
+   */
+  public String getName() {
+    return getClass().getName();
+  }
+  
+  /**
+   * Ensure the InputStream is a DataInput, wrapping it if necessary
+   * @param in the input stream to wrap
+   * @return the wrapped stream
+   */
+  protected DataInput ensureDataInput(InputStream in) {
+    if (in instanceof DataInput) {
+      return (DataInput) in;
+    } else {
+      return new DataInputStream(in);
+    }
+  }
+
+  /**
+   * Ensure the OutputStream is a DataOutput, wrapping it if necessary.
+   * @param out the output stream to wrap
+   * @return the wrapped stream
+   */
+  protected DataOutput ensureDataOutput(OutputStream out) {
+    if (out instanceof DataOutput) {
+      return (DataOutput) out;
+    } else {
+      return new DataOutputStream(out);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public Serialization<T> clone() {
+    try {
+      return (Serialization<T>) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new IllegalArgumentException("Can't clone object " + this, e);
+    }
+  }
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SERIALIZATIONS_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SERIALIZATIONS_KEY;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serial.lib.CompatibilitySerialization;
+import org.apache.hadoop.io.serial.lib.WritableSerialization;
+import org.apache.hadoop.io.serial.lib.avro.AvroSerialization;
+import org.apache.hadoop.io.serial.lib.protobuf.ProtoBufSerialization;
+import org.apache.hadoop.io.serial.lib.thrift.ThriftSerialization;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A factory that finds and creates Serializations.
+ * 
+ * There are two methods. The first finds a Serialization by its name (ie.
+ * avro, writable, thrift, etc.). The second finds a TypedSerialization based
+ * on the type that needs to be serialized.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SerializationFactory {
+  private static final Log LOG = LogFactory.getLog(SerializationFactory.class);
+
+  private final List<TypedSerialization<?>> typedSerializations =
+    new ArrayList<TypedSerialization<?>>();
+  private final Map<String, Serialization<?>> serializations = 
+    new HashMap<String, Serialization<?>>();
+
+  public SerializationFactory(Configuration conf) {
+    Class<?>[] classes = 
+      conf.getClasses(HADOOP_SERIALIZATIONS_KEY, 
+                      new Class<?>[]{WritableSerialization.class,
+                                     ProtoBufSerialization.class,
+                                     ThriftSerialization.class,
+                                     AvroSerialization.class,
+                                     CompatibilitySerialization.class});
+    for(Class<?> cls: classes) {
+      if (Serialization.class.isAssignableFrom(cls)) {
+        Serialization<?> serial = 
+          (Serialization<?>) ReflectionUtils.newInstance(cls, conf);
+        if (serial instanceof TypedSerialization<?>) {
+          typedSerializations.add((TypedSerialization<?>) serial);
+        }
+        String name = serial.getName();
+        if (serializations.containsKey(name)) {
+          throw new IllegalArgumentException("Two serializations have the" + 
+                                             " same name: " + name);
+        }
+        serializations.put(serial.getName(), serial);
+        LOG.debug("Adding serialization " + serial.getName());
+      } else {
+        throw new IllegalArgumentException("Unknown serialization class " +
+                                           cls.getName());
+      }
+    }
+  }
+
+  private static final Map<String, SerializationFactory> FACTORY_CACHE =
+    new HashMap<String, SerializationFactory>();
+  
+  /**
+   * Get the cached factory for the given configuration. Two configurations
+   * that have the same io.configurations value will be considered identical
+   * because we can't keep a reference to the Configuration without locking it
+   * in memory.
+   * @param conf the configuration
+   * @return the factory for a given configuration
+   */
+  public static synchronized 
+  SerializationFactory getInstance(Configuration conf) {
+    String serializerNames = conf.get(HADOOP_SERIALIZATIONS_KEY, "*default*");
+    String obsoleteSerializerNames = conf.get(IO_SERIALIZATIONS_KEY, "*default*");
+    String key = serializerNames + " " + obsoleteSerializerNames;
+    SerializationFactory result = FACTORY_CACHE.get(key);
+    if (result == null) {
+      result = new SerializationFactory(conf);
+      FACTORY_CACHE.put(key, result);
+    }
+    return result;
+  }
+
+  /**
+   * Look up a serialization by name and return a clone of it.
+   * @param name
+   * @return a newly cloned serialization of the right name
+   */
+  public Serialization<?> getSerialization(String name) {
+    return serializations.get(name).clone();
+  }
+  
+  /**
+   * Find the first acceptable serialization for a given type.
+   * @param cls the class that should be serialized
+   * @return a serialization that should be used to serialize the class
+   */
+  @SuppressWarnings("unchecked")
+  public <T> TypedSerialization<? super T> getSerializationByType(Class<T> cls){
+    for (TypedSerialization<?> serial: typedSerializations) {
+      if (serial.accept(cls)) {
+        TypedSerialization<? super T> result = 
+          (TypedSerialization<? super T>) serial.clone();
+        result.setSpecificType(cls);
+        return result;
+      }
+    }
+    throw new IllegalArgumentException("Could not find a serialization to"+
+                                       " accept " + cls.getName());
+  }
+
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,140 @@
+package org.apache.hadoop.io.serial;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serial.lib.SerializationMetadata.TypedSerializationMetadata;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * An abstract base class for serializers that handle types under a given
+ * parent type. Generally, their metadata consists of the class name of the
+ * specific type that is being serialized.
+ * <p>
+ * Typically, TypedSerializations have two types. The first is the base type,
+ * which is the static parent type that it can serialize. The other is the
+ * specific type that this instance is current serializing.
+ * @param <T> the base type that a given class of Serializers will serialize.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class TypedSerialization<T> extends Serialization<T> {
+  protected Class<? extends T> specificType;
+  
+  protected TypedSerialization() {
+  }
+  
+  protected TypedSerialization(Class<? extends T> specificType) {
+    this.specificType = specificType;
+  }
+
+  /**
+   * Get the base class that this method of serialization can handle.
+   * @return the base class
+   */
+  public abstract Class<T> getBaseType();
+  
+  public void setSpecificType(Class<? extends T> cls) {
+    specificType = cls;
+  }
+
+  public Class<? extends T> getSpecificType() {
+    return specificType;
+  }
+  
+  /**
+   * Can this serialization serialize/deserialize a given class
+   * @param candidateClass the class in question
+   * @return true if the class can be serialized
+   */
+  public boolean accept(Class<?> candidateClass) {
+    return getBaseType().isAssignableFrom(candidateClass);
+  }
+
+  /**
+   * Read the specific class as the metadata.
+   * @throws IOException when class not found or the deserialization fails
+   */
+  @Override
+  public void deserializeSelf(InputStream in, 
+                              Configuration conf) throws IOException {
+    TypedSerializationMetadata data = TypedSerializationMetadata.parseFrom(in);
+    if (data.hasTypename()) {
+      setSpecificTypeByName(data.getTypename());
+    }
+  }
+
+  /**
+   * Write the specific class name as the metadata.
+   */
+  @Override
+  public void serializeSelf(OutputStream out) throws IOException {
+    TypedSerializationMetadata.newBuilder().
+      setTypename(specificType == null ? "" : specificType.getName()).
+      build().writeTo(out);
+  }
+
+  private static final String CLASS_ATTRIBUTE = "class";
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void fromString(String meta) throws IOException {
+    Yaml yaml = new Yaml();
+    Map<String, String> map = (Map<String,String>) yaml.load(meta);
+    String cls = map.get(CLASS_ATTRIBUTE);
+    setSpecificTypeByName(cls);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void setSpecificTypeByName(String name) throws IOException {
+    if (name == null || name.length() == 0) {
+      specificType = null;
+    } else {
+      try {
+        setSpecificType((Class<? extends T>) Class.forName(name));
+      } catch (ClassNotFoundException e) {
+        throw new IOException("serializer class not found " + name, e);
+      }
+    }
+  }
+
+  public String toString() {
+    Yaml yaml = new Yaml();
+    Map<String,String> map = new HashMap<String,String>();
+    if (specificType != null) {
+      map.put(CLASS_ATTRIBUTE, specificType.getName());
+    }
+    return yaml.dump(map);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean equals(Object right) {
+    if (this == right) {
+      return true;
+    } else if (right == null || right.getClass() != getClass()) {
+      return false;
+    } else {
+      TypedSerialization<T> rightTyped = (TypedSerialization<T>) right;
+      return specificType == rightTyped.specificType;
+    }
+  }
+  
+  @Override
+  public int hashCode() {
+    return specificType == null ? 42 : specificType.hashCode();
+  }
+  
+  @Override
+  public TypedSerialization<T> clone() {
+    TypedSerialization<T> result = (TypedSerialization<T>) super.clone();
+    result.specificType = specificType;
+    return result;
+  }
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial.lib;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.TypedSerialization;
+import org.apache.hadoop.io.serial.lib.avro.AvroComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class allows user-defined old style serializers to run inside the new
+ * framework. This will only be used for user serializations that haven't been
+ * ported yet.
+ */
+@SuppressWarnings("deprecation")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CompatibilitySerialization extends TypedSerialization<Object> 
+                                        implements Configurable {
+  private org.apache.hadoop.io.serializer.SerializationFactory factory;
+  
+  @SuppressWarnings("unchecked")
+  private org.apache.hadoop.io.serializer.Serialization 
+    serialization = null;
+
+  public CompatibilitySerialization() {
+    // NOTHING
+  }
+
+  @Override
+  public CompatibilitySerialization clone() {
+    CompatibilitySerialization result = 
+      (CompatibilitySerialization) super.clone();
+    result.factory = factory;
+    result.serialization = serialization;
+    return result;
+  }
+
+  @Override
+  public Class<Object> getBaseType() {
+    return Object.class;
+  }
+
+  @Override
+  public boolean accept(Class<? extends Object> candidateClass) {
+    return factory.getSerialization(candidateClass) != null;
+  }
+
+  @Override
+  public void setSpecificType(Class<?> cls) {
+    super.setSpecificType(cls);
+    serialization = factory.getSerialization(cls);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object deserialize(InputStream stream, Object reusableObject,
+                            Configuration conf) throws IOException {
+    org.apache.hadoop.io.serializer.Deserializer deserializer =
+      serialization.getDeserializer(specificType);
+    deserializer.open(stream);
+    Object result = deserializer.deserialize(reusableObject);
+    // if the object is new, configure it
+    if (result != reusableObject) {
+      ReflectionUtils.setConf(result, conf);
+    }
+    return result;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public RawComparator getRawComparator() {
+    if (specificType == null) {
+      throw new 
+        IllegalArgumentException("Must have specific type for comparision");
+    } else if (serialization instanceof 
+                 org.apache.hadoop.io.serializer.WritableSerialization) {
+      return WritableComparator.get((Class) specificType);
+    } else if (serialization instanceof
+                 org.apache.hadoop.io.serializer.avro.AvroReflectSerialization){
+      Schema schema = ReflectData.get().getSchema(specificType);
+      return new AvroComparator(schema);
+    } else if (serialization instanceof
+                org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization){
+      Schema schema = SpecificData.get().getSchema(specificType);
+      return new AvroComparator(schema);
+    } else if (Comparable.class.isAssignableFrom(specificType)) {
+      // if the type is comparable, we can deserialize
+      return new DeserializationRawComparator(this, null);
+    } else {
+      return new MemcmpRawComparator();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void serialize(OutputStream stream, Object object) throws IOException {
+    org.apache.hadoop.io.serializer.Serializer serializer = 
+      serialization.getSerializer(specificType);
+    serializer.open(stream);
+    serializer.serialize(object);
+  }
+
+  @Override
+  public String getName() {
+    return "compatibility";
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    factory = new org.apache.hadoop.io.serializer.SerializationFactory(conf);
+  }
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.Serialization;
+
+/**
+ * <p>
+ * A {@link RawComparator} that uses a {@link Serialization}
+ * object to deserialize objects that are then compared via
+ * their {@link Comparable} interfaces.
+ * </p>
+ * @param <T>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class DeserializationRawComparator<T extends Comparable<T>> 
+  implements RawComparator {
+  private final Serialization<T> serialization;
+  private final Configuration conf;
+
+  private static final class ReusableObjects<T extends Comparable<T>> {
+    DataInputBuffer buf = new DataInputBuffer();
+    T left = null;
+    T right = null;
+  }
+
+  private static final ThreadLocal<ReusableObjects<?>> REUSE_FACTORY =
+    new ThreadLocal<ReusableObjects<?>>(){
+    @SuppressWarnings("unchecked")
+    @Override
+    protected ReusableObjects<?> initialValue() {
+      return new ReusableObjects();
+    }
+  };
+
+  public DeserializationRawComparator(Serialization<T> serialization,
+                                     Configuration conf) {
+    this.serialization = serialization;
+    this.conf = conf;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    ReusableObjects<T> reuse = (ReusableObjects<T>) REUSE_FACTORY.get();
+    try {
+      reuse.buf.reset(b1, s1, l1);
+      reuse.left = serialization.deserialize(reuse.buf, reuse.left, conf);
+      reuse.buf.reset(b2, s2, l2);
+      reuse.right = serialization.deserialize(reuse.buf, reuse.right, conf);
+      return reuse.left.compareTo(reuse.right);
+    } catch (IOException e) {
+      throw new RuntimeException("Error in deserialization",e);
+    }
+  }
+
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial.lib;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.TypedSerialization;
+
+/**
+ * A serialization binding for Java serialization. It has the advantage of
+ * handling all serializable Java types, but is not space or time efficient. In
+ * particular, the type information is repeated in each record.
+ * It is not enabled by default.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class JavaSerialization extends TypedSerialization<Serializable> {
+  
+  @Override
+  public Serializable deserialize(InputStream stream,
+                                  Serializable reusableObject,
+                                  Configuration conf) throws IOException {
+    ObjectInputStream ois = new ObjectInputStream(stream) {
+      @Override protected void readStreamHeader() {
+        // no header
+      }
+    };
+    try {
+      // ignore passed-in object
+      Serializable result = (Serializable) ois.readObject();
+      return result;
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e.toString());
+    }
+  }
+
+  @Override
+  public void deserializeSelf(InputStream in, Configuration conf) {
+    // nothing
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public RawComparator getRawComparator() {
+    return new DeserializationRawComparator(this, null);
+  }
+
+  @Override
+  public void serialize(OutputStream stream, Serializable object
+                        ) throws IOException {
+    ObjectOutputStream oos = new ObjectOutputStream(stream) {
+      @Override protected void writeStreamHeader() {
+        // no header
+      }
+    };
+    oos.reset(); // clear (class) back-references
+    oos.writeObject(object);
+    oos.flush();
+  }
+
+  @Override
+  public void serializeSelf(OutputStream out) throws IOException {
+    // nothing
+  }
+
+  @Override
+  public Class<Serializable> getBaseType() {
+    return Serializable.class;
+  }
+  
+  @Override
+  public String getName() {
+    return "java";
+  }
+  
+  @Override
+  public void fromString(String metadata) {
+    // NOTHING
+  }
+  
+  @Override
+  public String toString() {
+    return "<Java Serialization>";
+  }
+}

Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java Sat Dec  4 07:13:10 2010
@@ -0,0 +1,19 @@
+package org.apache.hadoop.io.serial.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * A raw comparator that compares byte strings in lexicographical order.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public final class MemcmpRawComparator implements RawComparator {
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+  }
+
+}



Mime
View raw message