hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1153634 [2/4] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/o...
Date Wed, 03 Aug 2011 19:59:54 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1153634&r1=1153633&r2=1153634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Aug  3 19:59:48 2011
@@ -19,26 +19,15 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,26 +37,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
-import org.apache.hadoop.hbase.util.BloomFilter;
-import org.apache.hadoop.hbase.util.ByteBloomFilter;
-import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
 
 /**
  * File format for hbase.
@@ -135,25 +112,13 @@ import org.apache.hadoop.io.compress.Dec
 public class HFile {
   static final Log LOG = LogFactory.getLog(HFile.class);
 
-  /* These values are more or less arbitrary, and they are used as a
-   * form of check to make sure the file isn't completely corrupt.
-   */
-  final static byte [] DATABLOCKMAGIC =
-    {'D', 'A', 'T', 'A', 'B', 'L', 'K', 42 };
-  final static byte [] INDEXBLOCKMAGIC =
-    { 'I', 'D', 'X', 'B', 'L', 'K', 41, 43 };
-  final static byte [] METABLOCKMAGIC =
-    { 'M', 'E', 'T', 'A', 'B', 'L', 'K', 99 };
-  final static byte [] TRAILERBLOCKMAGIC =
-    { 'T', 'R', 'A', 'B', 'L', 'K', 34, 36 };
-
   /**
    * Maximum length of key in HFile.
    */
   public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
 
   /**
-   * Default blocksize for hfile.
+   * Default block size for an HFile.
    */
   public final static int DEFAULT_BLOCKSIZE = 64 * 1024;
 
@@ -162,1653 +127,229 @@ public class HFile {
    */
   public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
     Compression.Algorithm.NONE;
+
+  /** Minimum supported HFile format version */
+  public static final int MIN_FORMAT_VERSION = 1;
+
+  /** Maximum supported HFile format version */
+  public static final int MAX_FORMAT_VERSION = 2;
+
   /** Default compression name: none. */
   public final static String DEFAULT_COMPRESSION =
     DEFAULT_COMPRESSION_ALGORITHM.getName();
 
+  /** Separator between HFile name and offset in block cache key */
+  static final char CACHE_KEY_SEPARATOR = '_';
+
   // For measuring latency of "typical" reads and writes
-  private static volatile long readOps;
-  private static volatile long readTime;
-  private static volatile long writeOps;
-  private static volatile long writeTime;
+  static volatile AtomicLong readOps = new AtomicLong();
+  static volatile AtomicLong readTimeNano = new AtomicLong();
+  static volatile AtomicLong writeOps = new AtomicLong();
+  static volatile AtomicLong writeTimeNano = new AtomicLong();
 
   public static final long getReadOps() {
-    long ret = readOps;
-    readOps = 0;
-    return ret;
+    return readOps.getAndSet(0);
   }
 
-  public static final long getReadTime() {
-    long ret = readTime;
-    readTime = 0;
-    return ret;
+  public static final long getReadTimeMs() {
+    return readTimeNano.getAndSet(0) / 1000000;
   }
 
   public static final long getWriteOps() {
-    long ret = writeOps;
-    writeOps = 0;
-    return ret;
+    return writeOps.getAndSet(0);
   }
 
-  public static final long getWriteTime() {
-    long ret = writeTime;
-    writeTime = 0;
-    return ret;
+  public static final long getWriteTimeMs() {
+    return writeTimeNano.getAndSet(0) / 1000000;
   }
 
-  /**
-   * HFile Writer.
-   */
-  public static class Writer implements Closeable {
-    // FileSystem stream to write on.
-    private FSDataOutputStream outputStream;
-    // True if we opened the <code>outputStream</code> (and so will close it).
-    private boolean closeOutputStream;
-
-    // Name for this object used when logging or in toString.  Is either
-    // the result of a toString on stream or else toString of passed file Path.
-    protected String name;
-
-    // Total uncompressed bytes, maybe calculate a compression ratio later.
-    private long totalBytes = 0;
-
-    // Total # of key/value entries, ie: how many times add() was called.
-    private int entryCount = 0;
-
-    // Used calculating average key and value lengths.
-    private long keylength = 0;
-    private long valuelength = 0;
-
-    // Used to ensure we write in order.
-    private final RawComparator<byte []> comparator;
-
-    // Number of uncompressed bytes per block.  Reinitialized when we start
-    // new block.
-    private int blocksize;
-
-    // Offset where the current block began.
-    private long blockBegin;
-
-    // First key in a block (Not first key in file).
-    private byte [] firstKey = null;
-
-    // Key previously appended.  Becomes the last key in the file.
-    private byte [] lastKeyBuffer = null;
-    private int lastKeyOffset = -1;
-    private int lastKeyLength = -1;
-
-    // See {@link BlockIndex}. Below four fields are used to write the block
-    // index.
-    ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
-    // Block offset in backing stream.
-    ArrayList<Long> blockOffsets = new ArrayList<Long>();
-    // Raw (decompressed) data size.
-    ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
-
-    // Meta block system.
-    private ArrayList<byte []> metaNames = new ArrayList<byte []>();
-    private ArrayList<Writable> metaData = new ArrayList<Writable>();
-
-    // Used compression.  Used even if no compression -- 'none'.
-    private final Compression.Algorithm compressAlgo;
-    private Compressor compressor;
-
-    // Special datastructure to hold fileinfo.
-    private FileInfo fileinfo = new FileInfo();
-
-    // May be null if we were passed a stream.
-    private Path path = null;
-
-    // Block cache to optionally fill on write
-    private BlockCache blockCache;
-
-    // Byte buffer output stream made per block written.
-    private ByteBufferOutputStream bbos = null;
-    private DataOutputStream bbosDos = null;
-    private int blockNumber = 0;
-
-    /**
-     * Constructor that uses all defaults for compression and block size.
-     * @param fs
-     * @param path
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Path path)
-    throws IOException {
-      this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null,
-          null);
-    }
-
-    /**
-     * Constructor that takes a Path.
-     * @param fs
-     * @param path
-     * @param blocksize
-     * @param compress
-     * @param comparator
-     * @throws IOException
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Path path, int blocksize,
-      String compress, final KeyComparator comparator)
-    throws IOException {
-      this(fs, path, blocksize,
-        compress == null? DEFAULT_COMPRESSION_ALGORITHM:
-          Compression.getCompressionAlgorithmByName(compress),
-        comparator, null);
-    }
-
-    /**
-     * Constructor that takes a Path.
-     * @param fs
-     * @param path
-     * @param blocksize
-     * @param compress
-     * @param comparator
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Path path, int blocksize,
-      Compression.Algorithm compress,
-      final KeyComparator comparator, BlockCache blockCache)
-    throws IOException {
-      this(fs.create(path), blocksize, compress, comparator);
-      this.closeOutputStream = true;
-      this.name = path.toString();
-      this.path = path;
-      this.blockCache = blockCache;
-    }
-
-    /**
-     * Constructor that takes a stream.
-     * @param ostream Stream to use.
-     * @param blocksize
-     * @param compress
-     * @param c RawComparator to use.
-     * @throws IOException
-     */
-    public Writer(final FSDataOutputStream ostream, final int blocksize,
-      final String  compress, final KeyComparator c)
-    throws IOException {
-      this(ostream, blocksize,
-        Compression.getCompressionAlgorithmByName(compress), c);
-    }
-
-    /**
-     * Constructor that takes a stream.
-     * @param ostream Stream to use.
-     * @param blocksize
-     * @param compress
-     * @param c
-     * @throws IOException
-     */
-    public Writer(final FSDataOutputStream ostream, final int blocksize,
-      final Compression.Algorithm  compress, final KeyComparator c)
-    throws IOException {
-      this.outputStream = ostream;
-      this.closeOutputStream = false;
-      this.blocksize = blocksize;
-      this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c;
-      this.name = this.outputStream.toString();
-      this.compressAlgo = compress == null?
-        DEFAULT_COMPRESSION_ALGORITHM: compress;
-    }
-
-    /*
-     * If at block boundary, opens new block.
-     * @throws IOException
-     */
-    private void checkBlockBoundary() throws IOException {
-      if (bbosDos != null && bbosDos.size() < blocksize) return;
-      finishBlock();
-      newBlock();
-    }
-
-    /*
-     * Do the cleanup if a current block.
-     * @throws IOException
-     */
-    private void finishBlock() throws IOException {
-      if (bbosDos == null) return;
-
-      // Flush Data Output Stream
-      bbosDos.flush();
-
-      // Compress Data and write to output stream
-      DataOutputStream compressStream = getCompressingStream();
-      bbos.writeTo(compressStream);
-      int size = releaseCompressingStream(compressStream);
-
-      long now = System.currentTimeMillis();
-
-      blockKeys.add(firstKey);
-      blockOffsets.add(Long.valueOf(blockBegin));
-      blockDataSizes.add(Integer.valueOf(size));
-      this.totalBytes += size;
-
-      writeTime += System.currentTimeMillis() - now;
-      writeOps++;
-
-      if (blockCache != null) {
-        byte[] bytes = bbos.toByteArray(DATABLOCKMAGIC.length, bbos.size() - DATABLOCKMAGIC.length);
-        ByteBuffer blockToCache = ByteBuffer.wrap(bytes);
-        String blockName = path.toString() + blockNumber;
-        blockCache.cacheBlock(blockName, blockToCache);
-      }
-
-      bbosDos.close();
-      bbosDos = null;
-      bbos = null;
-
-      blockNumber++;
-    }
-
-    /*
-     * Ready a new block for writing.
-     * @throws IOException
-     */
-    private void newBlock() throws IOException {
-      // This is where the next block begins.
-      blockBegin = outputStream.getPos();
-
-      firstKey = null;
-
-      // to avoid too many calls to realloc(),
-      // pre-allocates the byte stream to the block size + 25%
-      // only if blocksize is under 1Gb
-      int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4));
-      bbos = new ByteBufferOutputStream(bbosBlocksize);
-      bbosDos = new DataOutputStream(bbos);
-      bbosDos.write(DATABLOCKMAGIC);
-    }
-
-    /*
-     * Sets up a compressor and creates a compression stream on top of
-     * this.outputStream.  Get one per block written.
-     * @return A compressing stream; if 'none' compression, returned stream
-     * does not compress.
-     * @throws IOException
-     * @see {@link #releaseCompressingStream(DataOutputStream)}
-     */
-    private DataOutputStream getCompressingStream() throws IOException {
-      this.compressor = compressAlgo.getCompressor();
-      // Get new DOS compression stream.  In tfile, the DOS, is not closed,
-      // just finished, and that seems to be fine over there.  TODO: Check
-      // no memory retention of the DOS.  Should I disable the 'flush' on the
-      // DOS as the BCFile over in tfile does?  It wants to make it so flushes
-      // don't go through to the underlying compressed stream.  Flush on the
-      // compressed downstream should be only when done.  I was going to but
-      // looks like when we call flush in here, its legitimate flush that
-      // should go through to the compressor.
-      OutputStream os =
-        this.compressAlgo.createCompressionStream(this.outputStream,
-        this.compressor, 0);
-      return new DataOutputStream(os);
-    }
-
-    /*
-     * Let go of block compressor and compressing stream gotten in call
-     * {@link #getCompressingStream}.
-     * @param dos
-     * @return How much was written on this stream since it was taken out.
-     * @see #getCompressingStream()
-     * @throws IOException
-     */
-    private int releaseCompressingStream(final DataOutputStream dos)
-    throws IOException {
-      dos.flush();
-      this.compressAlgo.returnCompressor(this.compressor);
-      this.compressor = null;
-      return dos.size();
-    }
+  /** API required to write an {@link HFile} */
+  public interface Writer extends Closeable {
 
-    /**
-     * Add a meta block to the end of the file. Call before close().
-     * Metadata blocks are expensive.  Fill one with a bunch of serialized data
-     * rather than do a metadata block per metadata instance.  If metadata is
-     * small, consider adding to file info using
-     * {@link #appendFileInfo(byte[], byte[])}
-     * @param metaBlockName name of the block
-     * @param content will call readFields to get data later (DO NOT REUSE)
-     */
-    public void appendMetaBlock(String metaBlockName, Writable content) {
-      byte[] key = Bytes.toBytes(metaBlockName);
-      int i;
-      for (i = 0; i < metaNames.size(); ++i) {
-        // stop when the current key is greater than our own
-        byte[] cur = metaNames.get(i);
-        if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
-            key, 0, key.length) > 0) {
-          break;
-        }
-      }
-      metaNames.add(i, key);
-      metaData.add(i, content);
-    }
+    /** Add an element to the file info map. */
+    void appendFileInfo(byte[] key, byte[] value) throws IOException;
 
-    /**
-     * Add to the file info.  Added key value can be gotten out of the return
-     * from {@link Reader#loadFileInfo()}.
-     * @param k Key
-     * @param v Value
-     * @throws IOException
-     */
-    public void appendFileInfo(final byte [] k, final byte [] v)
-    throws IOException {
-      appendFileInfo(this.fileinfo, k, v, true);
-    }
+    void append(KeyValue kv) throws IOException;
 
-    static FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v,
-      final boolean checkPrefix)
-    throws IOException {
-      if (k == null || v == null) {
-        throw new NullPointerException("Key nor value may be null");
-      }
-      if (checkPrefix &&
-          Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
-        throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX +
-          " are reserved");
-      }
-      fi.put(k, v);
-      return fi;
-    }
+    void append(byte[] key, byte[] value) throws IOException;
 
-    /**
-     * @return Path or null if we were passed a stream rather than a Path.
-     */
-    public Path getPath() {
-      return this.path;
-    }
+    /** @return the path to this {@link HFile} */
+    Path getPath();
 
-    @Override
-    public String toString() {
-      return "writer=" + this.name + ", compression=" +
-        this.compressAlgo.getName();
-    }
+    void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
 
     /**
-     * Add key/value to file.
-     * Keys must be added in an order that agrees with the Comparator passed
-     * on construction.
-     * @param kv KeyValue to add.  Cannot be empty nor null.
-     * @throws IOException
+     * Adds an inline block writer such as a multi-level block index writer or
+     * a compound Bloom filter writer.
      */
-    public void append(final KeyValue kv)
-    throws IOException {
-      append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
-        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
-    }
+    void addInlineBlockWriter(InlineBlockWriter bloomWriter);
 
     /**
-     * Add key/value to file.
-     * Keys must be added in an order that agrees with the Comparator passed
-     * on construction.
-     * @param key Key to add.  Cannot be empty nor null.
-     * @param value Value to add.  Cannot be empty nor null.
-     * @throws IOException
+     * Store Bloom filter in the file. This does not deal with Bloom filter
+     * internals but is necessary, since Bloom filters are stored differently
+     * in HFile version 1 and version 2.
      */
-    public void append(final byte [] key, final byte [] value)
-    throws IOException {
-      append(key, 0, key.length, value, 0, value.length);
-    }
-
-    /**
-     * Add key/value to file.
-     * Keys must be added in an order that agrees with the Comparator passed
-     * on construction.
-     * @param key
-     * @param koffset
-     * @param klength
-     * @param value
-     * @param voffset
-     * @param vlength
-     * @throws IOException
-     */
-    private void append(final byte [] key, final int koffset, final int klength,
-        final byte [] value, final int voffset, final int vlength)
-    throws IOException {
-      boolean dupKey = checkKey(key, koffset, klength);
-      checkValue(value, voffset, vlength);
-      if (!dupKey) {
-        checkBlockBoundary();
-      }
-      // Write length of key and value and then actual key and value bytes.
-      this.bbosDos.writeInt(klength);
-      this.keylength += klength;
-      this.bbosDos.writeInt(vlength);
-      this.valuelength += vlength;
-      this.bbosDos.write(key, koffset, klength);
-      this.bbosDos.write(value, voffset, vlength);
-      // Are we the first key in this block?
-      if (this.firstKey == null) {
-        // Copy the key.
-        this.firstKey = new byte [klength];
-        System.arraycopy(key, koffset, this.firstKey, 0, klength);
-      }
-      this.lastKeyBuffer = key;
-      this.lastKeyOffset = koffset;
-      this.lastKeyLength = klength;
-      this.entryCount ++;
-    }
-
-    /*
-     * @param key Key to check.
-     * @return the flag of duplicate Key or not
-     * @throws IOException
-     */
-    private boolean checkKey(final byte [] key, final int offset, final int length)
-    throws IOException {
-      boolean dupKey = false;
-
-      if (key == null || length <= 0) {
-        throw new IOException("Key cannot be null or empty");
-      }
-      if (length > MAXIMUM_KEY_LENGTH) {
-        throw new IOException("Key length " + length + " > " +
-          MAXIMUM_KEY_LENGTH);
-      }
-      if (this.lastKeyBuffer != null) {
-        int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset,
-            this.lastKeyLength, key, offset, length);
-        if (keyComp > 0) {
-          throw new IOException("Added a key not lexically larger than" +
-            " previous key=" + Bytes.toStringBinary(key, offset, length) +
-            ", lastkey=" + Bytes.toStringBinary(this.lastKeyBuffer, this.lastKeyOffset,
-                this.lastKeyLength));
-        } else if (keyComp == 0) {
-          dupKey = true;
-        }
-      }
-      return dupKey;
-    }
-
-    private void checkValue(final byte [] value, final int offset,
-        final int length) throws IOException {
-      if (value == null) {
-        throw new IOException("Value cannot be null");
-      }
-    }
-
-    public long getTotalBytes() {
-      return this.totalBytes;
-    }
-
-    public void close() throws IOException {
-      if (this.outputStream == null) {
-        return;
-      }
-      // Write out the end of the data blocks, then write meta data blocks.
-      // followed by fileinfo, data block index and meta block index.
-
-      finishBlock();
-
-      FixedFileTrailer trailer = new FixedFileTrailer();
-
-      // Write out the metadata blocks if any.
-      ArrayList<Long> metaOffsets = null;
-      ArrayList<Integer> metaDataSizes = null;
-      if (metaNames.size() > 0) {
-        metaOffsets = new ArrayList<Long>(metaNames.size());
-        metaDataSizes = new ArrayList<Integer>(metaNames.size());
-        for (int i = 0 ; i < metaNames.size() ; ++ i ) {
-          // store the beginning offset
-          long curPos = outputStream.getPos();
-          metaOffsets.add(curPos);
-          // write the metadata content
-          DataOutputStream dos = getCompressingStream();
-          dos.write(METABLOCKMAGIC);
-          metaData.get(i).write(dos);
-          int size = releaseCompressingStream(dos);
-          // store the metadata size
-          metaDataSizes.add(size);
-        }
-      }
-
-      // Write fileinfo.
-      trailer.fileinfoOffset = writeFileInfo(this.outputStream);
-
-      // Write the data block index.
-      trailer.dataIndexOffset = BlockIndex.writeIndex(this.outputStream,
-        this.blockKeys, this.blockOffsets, this.blockDataSizes);
-
-      // Meta block index.
-      if (metaNames.size() > 0) {
-        trailer.metaIndexOffset = BlockIndex.writeIndex(this.outputStream,
-          this.metaNames, metaOffsets, metaDataSizes);
-      }
-
-      // Now finish off the trailer.
-      trailer.dataIndexCount = blockKeys.size();
-      trailer.metaIndexCount = metaNames.size();
-
-      trailer.totalUncompressedBytes = totalBytes;
-      trailer.entryCount = entryCount;
-
-      trailer.compressionCodec = this.compressAlgo.ordinal();
-
-      trailer.serialize(outputStream);
-
-      if (this.closeOutputStream) {
-        this.outputStream.close();
-        this.outputStream = null;
-      }
-    }
-
-    /*
-     * Add last bits of metadata to fileinfo and then write it out.
-     * Reader will be expecting to find all below.
-     * @param o Stream to write on.
-     * @return Position at which we started writing.
-     * @throws IOException
-     */
-    private long writeFileInfo(FSDataOutputStream o) throws IOException {
-      if (this.lastKeyBuffer != null) {
-        // Make a copy.  The copy is stuffed into HMapWritable.  Needs a clean
-        // byte buffer.  Won't take a tuple.
-        byte [] b = new byte[this.lastKeyLength];
-        System.arraycopy(this.lastKeyBuffer, this.lastKeyOffset, b, 0,
-          this.lastKeyLength);
-        appendFileInfo(this.fileinfo, FileInfo.LASTKEY, b, false);
-      }
-      int avgKeyLen = this.entryCount == 0? 0:
-        (int)(this.keylength/this.entryCount);
-      appendFileInfo(this.fileinfo, FileInfo.AVG_KEY_LEN,
-        Bytes.toBytes(avgKeyLen), false);
-      int avgValueLen = this.entryCount == 0? 0:
-        (int)(this.valuelength/this.entryCount);
-      appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN,
-        Bytes.toBytes(avgValueLen), false);
-      appendFileInfo(this.fileinfo, FileInfo.COMPARATOR,
-        Bytes.toBytes(this.comparator.getClass().getName()), false);
-      long pos = o.getPos();
-      this.fileinfo.write(o);
-      return pos;
-    }
+    void addBloomFilter(BloomFilterWriter bfw);
   }
 
   /**
-   * HFile Reader.
+   * This variety of ways to construct writers is used throughout the code, and
+   * we want to be able to swap writer implementations.
    */
-  public static class Reader implements Closeable {
-    // Stream to read from.
-    private FSDataInputStream istream;
-    // True if we should close istream when done.  We don't close it if we
-    // didn't open it.
-    private boolean closeIStream;
-
-    // These are read in when the file info is loaded.
-    HFile.BlockIndex blockIndex;
-    private BlockIndex metaIndex;
-    FixedFileTrailer trailer;
-    private volatile boolean fileInfoLoaded = false;
-
-    // Filled when we read in the trailer.
-    private Compression.Algorithm compressAlgo;
-
-    // Last key in the file.  Filled in when we read in the file info
-    private byte [] lastkey = null;
-    // Stats read in when we load file info.
-    private int avgKeyLen = -1;
-    private int avgValueLen = -1;
-
-    // Used to ensure we seek correctly.
-    RawComparator<byte []> comparator;
-
-    // Size of this file.
-    private final long fileSize;
-
-    // Block cache to use.
-    private final BlockCache cache;
-    public int cacheHits = 0;
-    public int blockLoads = 0;
-    public int metaLoads = 0;
-
-    // Whether file is from in-memory store
-    private boolean inMemory = false;
-
-    // Whether blocks of file should be evicted on close of file
-    private final boolean evictOnClose;
-
-    // Name for this object used when logging or in toString.  Is either
-    // the result of a toString on the stream or else is toString of passed
-    // file Path plus metadata key/value pairs.
-    protected String name;
+  public static abstract class WriterFactory {
+    protected Configuration conf;
 
-    /**
-     * Opens a HFile.  You must load the file info before you can
-     * use it by calling {@link #loadFileInfo()}.
-     *
-     * @param fs filesystem to load from
-     * @param path path within said filesystem
-     * @param cache block cache. Pass null if none.
-     * @throws IOException
-     */
-    public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory,
-        boolean evictOnClose)
-    throws IOException {
-      this(path, fs.open(path), fs.getFileStatus(path).getLen(), cache,
-          inMemory, evictOnClose);
-      this.closeIStream = true;
-      this.name = path.toString();
-    }
+    WriterFactory(Configuration conf) { this.conf = conf; }
 
-    /**
-     * Opens a HFile.  You must load the index before you can
-     * use it by calling {@link #loadFileInfo()}.
-     *
-     * @param fsdis input stream.  Caller is responsible for closing the passed
-     * stream.
-     * @param size Length of the stream.
-     * @param cache block cache. Pass null if none.
-     * @param inMemory whether blocks should be marked as in-memory in cache
-     * @param evictOnClose whether blocks in cache should be evicted on close
-     * @throws IOException
-     */
-    public Reader(Path path, final FSDataInputStream fsdis, final long size,
-        final BlockCache cache, final boolean inMemory,
-        final boolean evictOnClose) {
-      this.cache = cache;
-      this.fileSize = size;
-      this.istream = fsdis;
-      this.closeIStream = false;
-      this.name = path.toString();
-      this.inMemory = inMemory;
-      this.evictOnClose = evictOnClose;
-    }
+    public abstract Writer createWriter(FileSystem fs, Path path)
+        throws IOException;
 
-    @Override
-    public String toString() {
-      return "reader=" + this.name +
-          (!isFileInfoLoaded()? "":
-            ", compression=" + this.compressAlgo.getName() +
-            ", inMemory=" + this.inMemory +
-            ", firstKey=" + toStringFirstKey() +
-            ", lastKey=" + toStringLastKey()) +
-            ", avgKeyLen=" + this.avgKeyLen +
-            ", avgValueLen=" + this.avgValueLen +
-            ", entries=" + this.trailer.entryCount +
-            ", length=" + this.fileSize;
-    }
+    public abstract Writer createWriter(FileSystem fs, Path path,
+        int blockSize, Compression.Algorithm compress,
+        final KeyComparator comparator) throws IOException;
 
-    protected String toStringFirstKey() {
-      return KeyValue.keyToString(getFirstKey());
-    }
-
-    protected String toStringLastKey() {
-      return KeyValue.keyToString(getLastKey());
-    }
-
-    public long length() {
-      return this.fileSize;
-    }
-
-    public long getTotalUncompressedBytes() {
-      return this.trailer.totalUncompressedBytes;
-    }
-    
-    public boolean inMemory() {
-      return this.inMemory;
-    }
+    public abstract Writer createWriter(FileSystem fs, Path path,
+        int blockSize, String compress,
+        final KeyComparator comparator) throws IOException;
 
-    private byte[] readAllIndex(final FSDataInputStream in, final long indexOffset,
-        final int indexSize) throws IOException {
-      byte[] allIndex = new byte[indexSize];
-      in.seek(indexOffset);
-      IOUtils.readFully(in, allIndex, 0, allIndex.length);
-      return allIndex;
-    }
+    public abstract Writer createWriter(final FSDataOutputStream ostream,
+        final int blockSize, final String compress,
+        final KeyComparator comparator) throws IOException;
 
-    /**
-     * Read in the index and file info.
-     * @return A map of fileinfo data.
-     * See {@link Writer#appendFileInfo(byte[], byte[])}.
-     * @throws IOException
-     */
-    public Map<byte [], byte []> loadFileInfo()
-    throws IOException {
-      this.trailer = readTrailer();
-
-      // Read in the fileinfo and get what we need from it.
-      this.istream.seek(this.trailer.fileinfoOffset);
-      FileInfo fi = new FileInfo();
-      fi.readFields(this.istream);
-      this.lastkey = fi.get(FileInfo.LASTKEY);
-      this.avgKeyLen = Bytes.toInt(fi.get(FileInfo.AVG_KEY_LEN));
-      this.avgValueLen = Bytes.toInt(fi.get(FileInfo.AVG_VALUE_LEN));
-      String clazzName = Bytes.toString(fi.get(FileInfo.COMPARATOR));
-      this.comparator = getComparator(clazzName);
-
-      int allIndexSize = (int)(this.fileSize - this.trailer.dataIndexOffset - FixedFileTrailer.trailerSize());
-      byte[] dataAndMetaIndex = readAllIndex(this.istream, this.trailer.dataIndexOffset, allIndexSize);
-
-      ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
-      DataInputStream dis = new DataInputStream(bis);
-
-      // Read in the data index.
-      this.blockIndex =
-          BlockIndex.readIndex(this.comparator, dis, this.trailer.dataIndexCount);
-
-      // Read in the metadata index.
-      if (trailer.metaIndexCount > 0) {
-        this.metaIndex = BlockIndex.readIndex(Bytes.BYTES_RAWCOMPARATOR, dis,
-            this.trailer.metaIndexCount);
-      }
-      this.fileInfoLoaded = true;
-
-      if (null != dis) {
-        dis.close();
-      }
+    public abstract Writer createWriter(final FSDataOutputStream ostream,
+        final int blockSize, final Compression.Algorithm compress,
+        final KeyComparator c) throws IOException;
+  }
 
-      return fi;
-    }
+  /** The configuration key for HFile version to use for new files */
+  public static final String FORMAT_VERSION_KEY = "hfile.format.version";
 
-    boolean isFileInfoLoaded() {
-      return this.fileInfoLoaded;
-    }
+  public static int getFormatVersion(Configuration conf) {
+    int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
+    checkFormatVersion(version);
+    return version;
+  }
 
-    @SuppressWarnings("unchecked")
-    private RawComparator<byte []> getComparator(final String clazzName)
-    throws IOException {
-      if (clazzName == null || clazzName.length() == 0) {
-        return null;
-      }
-      try {
-        return (RawComparator<byte []>)Class.forName(clazzName).newInstance();
-      } catch (InstantiationException e) {
-        throw new IOException(e);
-      } catch (IllegalAccessException e) {
-        throw new IOException(e);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
+  /**
+   * Returns the factory to be used to create {@link HFile} writers. Should
+   * always be {@link HFileWriterV2#WRITER_FACTORY_V2} in production, but
+   * can also be {@link HFileWriterV1#WRITER_FACTORY_V1} in testing.
+   */
+  public static final WriterFactory getWriterFactory(Configuration conf) {
+    int version = getFormatVersion(conf);
+    LOG.debug("Using HFile format version " + version);
+    switch (version) {
+    case 1:
+      return new HFileWriterV1.WriterFactoryV1(conf);
+    case 2:
+      return new HFileWriterV2.WriterFactoryV2(conf);
+    default:
+      throw new IllegalArgumentException("Cannot create writer for HFile " +
+          "format version " + version);
     }
+  }
 
-    /* Read the trailer off the input stream.  As side effect, sets the
-     * compression algorithm.
-     * @return Populated FixedFileTrailer.
-     * @throws IOException
-     */
-    private FixedFileTrailer readTrailer() throws IOException {
-      FixedFileTrailer fft = new FixedFileTrailer();
-      long seekPoint = this.fileSize - FixedFileTrailer.trailerSize();
-      this.istream.seek(seekPoint);
-      fft.deserialize(this.istream);
-      // Set up the codec.
-      this.compressAlgo =
-        Compression.Algorithm.values()[fft.compressionCodec];
+  /**
+   * Configuration key to evict all blocks of a given file from the block cache
+   * when the file is closed.
+   */
+  public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
+      "hbase.rs.evictblocksonclose";
 
-      CompressionTest.testCompression(this.compressAlgo);
+  /**
+   * Configuration key to cache data blocks on write. There are separate
+   * switches for Bloom blocks and non-root index blocks.
+   */
+  public static final String CACHE_BLOCKS_ON_WRITE_KEY =
+      "hbase.rs.cacheblocksonwrite";
 
-      return fft;
-    }
+  /** An interface used by clients to open and iterate an {@link HFile}. */
+  public interface Reader extends Closeable {
 
     /**
-     * Create a Scanner on this file.  No seeks or reads are done on creation.
-     * Call {@link HFileScanner#seekTo(byte[])} to position an start the read.
-     * There is nothing to clean up in a Scanner. Letting go of your references
-     * to the scanner is sufficient.
-     * @param pread Use positional read rather than seek+read if true (pread is
-     * better for random reads, seek+read is better scanning).
-     * @param cacheBlocks True if we should cache blocks read in by this scanner.
-     * @return Scanner on this file.
+     * Returns this reader's "name". Usually the last component of the path.
+     * Needs to be constant as the file is being moved to support caching on
+     * write.
      */
-    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
-      return new Scanner(this, cacheBlocks, pread);
-    }
+     String getName();
 
-    /**
-     * @param key Key to search.
-     * @return Block number of the block containing the key or -1 if not in this
-     * file.
-     */
-    protected int blockContainingKey(final byte [] key, int offset, int length) {
-      if (blockIndex == null) {
-        throw new RuntimeException("Block index not loaded");
-      }
-      return blockIndex.blockContainingKey(key, offset, length);
-    }
-    /**
-     * @param metaBlockName
-     * @param cacheBlock Add block to cache, if found
-     * @return Block wrapped in a ByteBuffer
-     * @throws IOException
-     */
-    public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
-    throws IOException {
-      if (trailer.metaIndexCount == 0) {
-        return null; // there are no meta blocks
-      }
-      if (metaIndex == null) {
-        throw new IOException("Meta index not loaded");
-      }
+     String getColumnFamilyName();
 
-      byte [] mbname = Bytes.toBytes(metaBlockName);
-      int block = metaIndex.blockContainingKey(mbname, 0, mbname.length);
-      if (block == -1)
-        return null;
-      long blockSize;
-      if (block == metaIndex.count - 1) {
-        blockSize = trailer.fileinfoOffset - metaIndex.blockOffsets[block];
-      } else {
-        blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block];
-      }
+     RawComparator<byte []> getComparator();
 
-      long now = System.currentTimeMillis();
+     HFileScanner getScanner(boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction);
 
-      // Per meta key from any given file, synchronize reads for said block
-      synchronized (metaIndex.blockKeys[block]) {
-        metaLoads++;
-        // Check cache for block.  If found return.
-        if (cache != null) {
-          ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block,
-              cacheBlock);
-          if (cachedBuf != null) {
-            // Return a distinct 'shallow copy' of the block,
-            // so pos doesnt get messed by the scanner
-            cacheHits++;
-            return cachedBuf.duplicate();
-          }
-          // Cache Miss, please load.
-        }
+     ByteBuffer getMetaBlock(String metaBlockName,
+        boolean cacheBlock) throws IOException;
 
-        ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
-          longToInt(blockSize), metaIndex.blockDataSizes[block], true);
-        byte [] magic = new byte[METABLOCKMAGIC.length];
-        buf.get(magic, 0, magic.length);
+     HFileBlock readBlock(long offset, int onDiskBlockSize,
+         boolean cacheBlock, final boolean pread, final boolean isCompaction)
+         throws IOException;
 
-        if (! Arrays.equals(magic, METABLOCKMAGIC)) {
-          throw new IOException("Meta magic is bad in block " + block);
-        }
+     Map<byte[], byte[]> loadFileInfo() throws IOException;
 
-        // Create a new ByteBuffer 'shallow copy' to hide the magic header
-        buf = buf.slice();
+     byte[] getLastKey();
 
-        readTime += System.currentTimeMillis() - now;
-        readOps++;
+     byte[] midkey() throws IOException;
 
-        // Cache the block
-        if(cacheBlock && cache != null) {
-          cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory);
-        }
+     long length();
 
-        return buf;
-      }
-    }
+     long getEntries();
 
-    /**
-     * Read in a file block.
-     * @param block Index of block to read.
-     * @param pread Use positional read instead of seek+read (positional is
-     * better doing random reads whereas seek+read is better scanning).
-     * @return Block wrapped in a ByteBuffer.
-     * @throws IOException
-     */
-    ByteBuffer readBlock(int block, boolean cacheBlock, final boolean pread)
-    throws IOException {
-      if (blockIndex == null) {
-        throw new IOException("Block index not loaded");
-      }
-      if (block < 0 || block >= blockIndex.count) {
-        throw new IOException("Requested block is out of range: " + block +
-          ", max: " + blockIndex.count);
-      }
-      // For any given block from any given file, synchronize reads for said
-      // block.
-      // Without a cache, this synchronizing is needless overhead, but really
-      // the other choice is to duplicate work (which the cache would prevent you from doing).
-      synchronized (blockIndex.blockKeys[block]) {
-        blockLoads++;
-        // Check cache for block.  If found return.
-        if (cache != null) {
-          ByteBuffer cachedBuf = cache.getBlock(name + block, cacheBlock);
-          if (cachedBuf != null) {
-            // Return a distinct 'shallow copy' of the block,
-            // so pos doesnt get messed by the scanner
-            cacheHits++;
-            return cachedBuf.duplicate();
-          }
-          // Carry on, please load.
-        }
+     byte[] getFirstKey();
 
-        // Load block from filesystem.
-        long now = System.currentTimeMillis();
-        long onDiskBlockSize;
-        if (block == blockIndex.count - 1) {
-          // last block!  The end of data block is first meta block if there is
-          // one or if there isn't, the fileinfo offset.
-          long offset = this.metaIndex != null?
-            this.metaIndex.blockOffsets[0]: this.trailer.fileinfoOffset;
-          onDiskBlockSize = offset - blockIndex.blockOffsets[block];
-        } else {
-          onDiskBlockSize = blockIndex.blockOffsets[block+1] -
-          blockIndex.blockOffsets[block];
-        }
-        ByteBuffer buf = decompress(blockIndex.blockOffsets[block],
-          longToInt(onDiskBlockSize), this.blockIndex.blockDataSizes[block],
-          pread);
-
-        byte [] magic = new byte[DATABLOCKMAGIC.length];
-        buf.get(magic, 0, magic.length);
-        if (!Arrays.equals(magic, DATABLOCKMAGIC)) {
-          throw new IOException("Data magic is bad in block " + block);
-        }
+     long indexSize();
 
-        // 'shallow copy' to hide the header
-        // NOTE: you WILL GET BIT if you call buf.array() but don't start
-        //       reading at buf.arrayOffset()
-        buf = buf.slice();
-
-        readTime += System.currentTimeMillis() - now;
-        readOps++;
-
-        // Cache the block
-        if(cacheBlock && cache != null) {
-          cache.cacheBlock(name + block, buf.duplicate(), inMemory);
-        }
+     byte[] getFirstRowKey();
 
-        return buf;
-      }
-    }
+     byte[] getLastRowKey();
 
-    /*
-     * Decompress <code>compressedSize</code> bytes off the backing
-     * FSDataInputStream.
-     * @param offset
-     * @param compressedSize
-     * @param decompressedSize
-     *
-     * @return
-     * @throws IOException
-     */
-    private ByteBuffer decompress(final long offset, final int compressedSize,
-      final int decompressedSize, final boolean pread)
-    throws IOException {
-      Decompressor decompressor = null;
-      ByteBuffer buf = null;
-      try {
-        decompressor = this.compressAlgo.getDecompressor();
-        // My guess is that the bounded range fis is needed to stop the
-        // decompressor reading into next block -- IIRC, it just grabs a
-        // bunch of data w/o regard to whether decompressor is coming to end of a
-        // decompression.
-
-        // We use a buffer of DEFAULT_BLOCKSIZE size.  This might be extreme.
-        // Could maybe do with less. Study and figure it: TODO
-        InputStream is = this.compressAlgo.createDecompressionStream(
-            new BufferedInputStream(
-                new BoundedRangeFileInputStream(this.istream, offset, compressedSize,
-                                                pread),
-                Math.min(DEFAULT_BLOCKSIZE, compressedSize)),
-            decompressor, 0);
-        buf = ByteBuffer.allocate(decompressedSize);
-        IOUtils.readFully(is, buf.array(), 0, buf.capacity());
-        is.close();
-      } finally {
-        if (null != decompressor) {
-          this.compressAlgo.returnDecompressor(decompressor);
-        }
-      }
-      return buf;
-    }
+     FixedFileTrailer getTrailer();
 
-    /**
-     * @return First key in the file.  May be null if file has no entries.
-     * Note that this is not the first rowkey, but rather the byte form of
-     * the first KeyValue.
-     */
-    public byte [] getFirstKey() {
-      if (blockIndex == null) {
-        throw new RuntimeException("Block index not loaded");
-      }
-      return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
-    }
+     HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
 
-    /**
-     * @return the first row key, or null if the file is empty.
-     * TODO move this to StoreFile after Ryan's patch goes in
-     * to eliminate KeyValue here
-     */
-    public byte[] getFirstRowKey() {
-      byte[] firstKey = getFirstKey();
-      if (firstKey == null) return null;
-      return KeyValue.createKeyValueFromKey(firstKey).getRow();
-    }
+     HFileScanner getScanner(boolean cacheBlocks, boolean pread);
 
-    /**
-     * @return number of KV entries in this HFile
-     */
-    public int getEntries() {
-      if (!this.isFileInfoLoaded()) {
-        throw new RuntimeException("File info not loaded");
-      }
-      return this.trailer.entryCount;
-    }
+     Compression.Algorithm getCompressionAlgorithm();
 
     /**
-     * @return Last key in the file.  May be null if file has no entries.
-     * Note that this is not the last rowkey, but rather the byte form of
-     * the last KeyValue.
+     * Retrieves Bloom filter metadata as appropriate for each {@link HFile}
+     * version. Knows nothing about how that metadata is structured.
      */
-    public byte [] getLastKey() {
-      if (!isFileInfoLoaded()) {
-        throw new RuntimeException("Load file info first");
-      }
-      return this.blockIndex.isEmpty()? null: this.lastkey;
-    }
-
-    /**
-     * @return the last row key, or null if the file is empty.
-     * TODO move this to StoreFile after Ryan's patch goes in
-     * to eliminate KeyValue here
-     */
-    public byte[] getLastRowKey() {
-      byte[] lastKey = getLastKey();
-      if (lastKey == null) return null;
-      return KeyValue.createKeyValueFromKey(lastKey).getRow();
-    }
-
-    /**
-     * @return number of K entries in this HFile's filter.  Returns KV count if no filter.
-     */
-    public int getFilterEntries() {
-      return getEntries();
-    }
-
-    /**
-     * @return Comparator.
-     */
-    public RawComparator<byte []> getComparator() {
-      return this.comparator;
-    }
-
-    public Compression.Algorithm getCompressionAlgorithm() {
-      return this.compressAlgo;
-    }
-
-    /**
-     * @return index size
-     */
-    public long indexSize() {
-      return (this.blockIndex != null? this.blockIndex.heapSize(): 0) +
-        ((this.metaIndex != null)? this.metaIndex.heapSize(): 0);
-    }
-
-    /**
-     * @return Midkey for this file.  We work with block boundaries only so
-     * returned midkey is an approximation only.
-     * @throws IOException
-     */
-    public byte [] midkey() throws IOException {
-      if (!isFileInfoLoaded() || this.blockIndex.isEmpty()) {
-        return null;
-      }
-      return this.blockIndex.midkey();
-    }
-
-    public void close() throws IOException {
-      if (evictOnClose && this.cache != null) {
-        int numEvicted = 0;
-        for (int i=0; i<blockIndex.count; i++) {
-          if (this.cache.evictBlock(name + i)) numEvicted++;
-        }
-        LOG.debug("On close of file " + name + " evicted " + numEvicted +
-            " block(s) of " + blockIndex.count + " total blocks");
-      }
-      if (this.closeIStream && this.istream != null) {
-        this.istream.close();
-        this.istream = null;
-      }
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    /*
-     * Implementation of {@link HFileScanner} interface.
-     */
-    protected static class Scanner implements HFileScanner {
-      private final Reader reader;
-      private ByteBuffer block;
-      private int currBlock;
-
-      private final boolean cacheBlocks;
-      private final boolean pread;
-
-      private int currKeyLen = 0;
-      private int currValueLen = 0;
-
-      public int blockFetches = 0;
-
-      public Scanner(Reader r, boolean cacheBlocks, final boolean pread) {
-        this.reader = r;
-        this.cacheBlocks = cacheBlocks;
-        this.pread = pread;
-      }
-
-      public KeyValue getKeyValue() {
-        if(this.block == null) {
-          return null;
-        }
-        return new KeyValue(this.block.array(),
-            this.block.arrayOffset() + this.block.position() - 8,
-            this.currKeyLen+this.currValueLen+8);
-      }
-
-      public ByteBuffer getKey() {
-        if (this.block == null || this.currKeyLen == 0) {
-          throw new RuntimeException("you need to seekTo() before calling getKey()");
-        }
-        ByteBuffer keyBuff = this.block.slice();
-        keyBuff.limit(this.currKeyLen);
-        keyBuff.rewind();
-        // Do keyBuff.asReadOnly()?
-        return keyBuff;
-      }
-
-      public ByteBuffer getValue() {
-        if (block == null || currKeyLen == 0) {
-          throw new RuntimeException("you need to seekTo() before calling getValue()");
-        }
-        // TODO: Could this be done with one ByteBuffer rather than create two?
-        ByteBuffer valueBuff = this.block.slice();
-        valueBuff.position(this.currKeyLen);
-        valueBuff = valueBuff.slice();
-        valueBuff.limit(currValueLen);
-        valueBuff.rewind();
-        return valueBuff;
-      }
-
-      public boolean next() throws IOException {
-        // LOG.deug("rem:" + block.remaining() + " p:" + block.position() +
-        // " kl: " + currKeyLen + " kv: " + currValueLen);
-        if (block == null) {
-          throw new IOException("Next called on non-seeked scanner");
-        }
-        block.position(block.position() + currKeyLen + currValueLen);
-        if (block.remaining() <= 0) {
-          // LOG.debug("Fetch next block");
-          currBlock++;
-          if (currBlock >= reader.blockIndex.count) {
-            // damn we are at the end
-            currBlock = 0;
-            block = null;
-            return false;
-          }
-          block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread);
-          currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
-          currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
-          block.position(block.position()+8);
-          blockFetches++;
-          return true;
-        }
-        // LOG.debug("rem:" + block.remaining() + " p:" + block.position() +
-        // " kl: " + currKeyLen + " kv: " + currValueLen);
-        currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
-        currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
-        block.position(block.position()+8);
-        return true;
-      }
-
-      public int seekTo(byte [] key) throws IOException {
-        return seekTo(key, 0, key.length);
-      }
-
-      public int seekTo(byte[] key, int offset, int length) throws IOException {
-        int b = reader.blockContainingKey(key, offset, length);
-        if (b < 0) return -1; // falls before the beginning of the file! :-(
-        // Avoid re-reading the same block (that'd be dumb).
-        loadBlock(b, true);
-        return blockSeek(key, offset, length, false);
-      }
-
-      public int reseekTo(byte [] key) throws IOException {
-        return reseekTo(key, 0, key.length);
-      }
-
-      public int reseekTo(byte[] key, int offset, int length)
-        throws IOException {
-
-        if (this.block != null && this.currKeyLen != 0) {
-          ByteBuffer bb = getKey();
-          int compared = this.reader.comparator.compare(key, offset, length,
-              bb.array(), bb.arrayOffset(), bb.limit());
-          if (compared < 1) {
-            //If the required key is less than or equal to current key, then
-            //don't do anything.
-            return compared;
-          }
-        }
-
-        int b = reader.blockContainingKey(key, offset, length);
-        if (b < 0) {
-          return -1;
-        }
-        loadBlock(b, false);
-        return blockSeek(key, offset, length, false);
-      }
-
-      /**
-       * Within a loaded block, seek looking for the first key
-       * that is smaller than (or equal to?) the key we are interested in.
-       *
-       * A note on the seekBefore - if you have seekBefore = true, AND the
-       * first key in the block = key, then you'll get thrown exceptions.
-       * @param key to find
-       * @param seekBefore find the key before the exact match.
-       * @return
-       */
-      private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) {
-        int klen, vlen;
-        int lastLen = 0;
-        do {
-          klen = block.getInt();
-          vlen = block.getInt();
-          int comp = this.reader.comparator.compare(key, offset, length,
-            block.array(), block.arrayOffset() + block.position(), klen);
-          if (comp == 0) {
-            if (seekBefore) {
-              block.position(block.position() - lastLen - 16);
-              currKeyLen = block.getInt();
-              currValueLen = block.getInt();
-              return 1; // non exact match.
-            }
-            currKeyLen = klen;
-            currValueLen = vlen;
-            return 0; // indicate exact match
-          }
-          if (comp < 0) {
-            // go back one key:
-            block.position(block.position() - lastLen - 16);
-            currKeyLen = block.getInt();
-            currValueLen = block.getInt();
-            return 1;
-          }
-          block.position(block.position() + klen + vlen);
-          lastLen = klen + vlen ;
-        } while(block.remaining() > 0);
-        // ok we are at the end, so go back a littleeeeee....
-        // The 8 in the below is intentionally different to the 16s in the above
-        // Do the math you you'll figure it.
-        block.position(block.position() - lastLen - 8);
-        currKeyLen = block.getInt();
-        currValueLen = block.getInt();
-        return 1; // didn't exactly find it.
-      }
-
-      public boolean seekBefore(byte [] key) throws IOException {
-        return seekBefore(key, 0, key.length);
-      }
-
-      public boolean seekBefore(byte[] key, int offset, int length)
-      throws IOException {
-        int b = reader.blockContainingKey(key, offset, length);
-        if (b < 0)
-          return false; // key is before the start of the file.
-
-        // Question: does this block begin with 'key'?
-        if (this.reader.comparator.compare(reader.blockIndex.blockKeys[b],
-            0, reader.blockIndex.blockKeys[b].length,
-            key, offset, length) == 0) {
-          // Ok the key we're interested in is the first of the block, so go back one.
-          if (b == 0) {
-            // we have a 'problem', the key we want is the first of the file.
-            return false;
-          }
-          b--;
-          // TODO shortcut: seek forward in this block to the last key of the block.
-        }
-        loadBlock(b, true);
-        blockSeek(key, offset, length, true);
-        return true;
-      }
-
-      public String getKeyString() {
-        return Bytes.toStringBinary(block.array(), block.arrayOffset() +
-          block.position(), currKeyLen);
-      }
-
-      public String getValueString() {
-        return Bytes.toString(block.array(), block.arrayOffset() +
-          block.position() + currKeyLen, currValueLen);
-      }
-
-      public Reader getReader() {
-        return this.reader;
-      }
-
-      public boolean isSeeked(){
-        return this.block != null;
-      }
-
-      public boolean seekTo() throws IOException {
-        if (this.reader.blockIndex.isEmpty()) {
-          return false;
-        }
-        if (block != null && currBlock == 0) {
-          block.rewind();
-          currKeyLen = block.getInt();
-          currValueLen = block.getInt();
-          return true;
-        }
-        currBlock = 0;
-        block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread);
-        currKeyLen = block.getInt();
-        currValueLen = block.getInt();
-        blockFetches++;
-        return true;
-      }
-
-      private void loadBlock(int bloc, boolean rewind) throws IOException {
-        if (block == null) {
-          block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
-          currBlock = bloc;
-          blockFetches++;
-        } else {
-          if (bloc != currBlock) {
-            block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
-            currBlock = bloc;
-            blockFetches++;
-          } else {
-            // we are already in the same block, just rewind to seek again.
-            if (rewind) {
-              block.rewind();
-            }
-            else {
-              //Go back by (size of rowlength + size of valuelength) = 8 bytes
-              block.position(block.position()-8);
-            }
-          }
-        }
-      }
-
-      @Override
-      public String toString() {
-        return "HFileScanner for reader " + String.valueOf(reader);
-      }
-    }
-
-    public String getTrailerInfo() {
-      return trailer.toString();
-    }
+     DataInput getBloomFilterMetadata() throws IOException;
   }
 
-  /*
-   * The RFile has a fixed trailer which contains offsets to other variable
-   * parts of the file.  Also includes basic metadata on this file.
-   */
-  private static class FixedFileTrailer {
-    // Offset to the fileinfo data, a small block of vitals..
-    long fileinfoOffset;
-    // Offset to the data block index.
-    long dataIndexOffset;
-    // How many index counts are there (aka: block count)
-    int dataIndexCount;
-    // Offset to the meta block index.
-    long metaIndexOffset;
-    // How many meta block index entries (aka: meta block count)
-    int metaIndexCount;
-    long totalUncompressedBytes;
-    int entryCount;
-    int compressionCodec;
-    int version = 1;
-
-    FixedFileTrailer() {
-      super();
-    }
-
-    static int trailerSize() {
-      // Keep this up to date...
-      return
-      ( Bytes.SIZEOF_INT * 5 ) +
-      ( Bytes.SIZEOF_LONG * 4 ) +
-      TRAILERBLOCKMAGIC.length;
-    }
-
-    void serialize(DataOutputStream outputStream) throws IOException {
-      outputStream.write(TRAILERBLOCKMAGIC);
-      outputStream.writeLong(fileinfoOffset);
-      outputStream.writeLong(dataIndexOffset);
-      outputStream.writeInt(dataIndexCount);
-      outputStream.writeLong(metaIndexOffset);
-      outputStream.writeInt(metaIndexCount);
-      outputStream.writeLong(totalUncompressedBytes);
-      outputStream.writeInt(entryCount);
-      outputStream.writeInt(compressionCodec);
-      outputStream.writeInt(version);
-    }
-
-    void deserialize(DataInputStream inputStream) throws IOException {
-      byte [] header = new byte[TRAILERBLOCKMAGIC.length];
-      inputStream.readFully(header);
-      if ( !Arrays.equals(header, TRAILERBLOCKMAGIC)) {
-        throw new IOException("Trailer 'header' is wrong; does the trailer " +
-          "size match content?");
-      }
-      fileinfoOffset         = inputStream.readLong();
-      dataIndexOffset        = inputStream.readLong();
-      dataIndexCount         = inputStream.readInt();
-
-      metaIndexOffset        = inputStream.readLong();
-      metaIndexCount         = inputStream.readInt();
-
-      totalUncompressedBytes = inputStream.readLong();
-      entryCount             = inputStream.readInt();
-      compressionCodec       = inputStream.readInt();
-      version                = inputStream.readInt();
-
-      if (version != 1) {
-        throw new IOException("Wrong version: " + version);
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "fileinfoOffset=" + fileinfoOffset +
-      ", dataIndexOffset=" + dataIndexOffset +
-      ", dataIndexCount=" + dataIndexCount +
-      ", metaIndexOffset=" + metaIndexOffset +
-      ", metaIndexCount=" + metaIndexCount +
-      ", totalBytes=" + totalUncompressedBytes +
-      ", entryCount=" + entryCount +
-      ", version=" + version;
+  private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
+      long size, boolean closeIStream, BlockCache blockCache,
+      boolean inMemory, boolean evictOnClose) throws IOException {
+    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
+    switch (trailer.getVersion()) {
+    case 1:
+      return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
+          blockCache, inMemory, evictOnClose);
+    case 2:
+      return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
+          blockCache, inMemory, evictOnClose);
+    default:
+      throw new IOException("Cannot instantiate reader for HFile version " +
+          trailer.getVersion());
     }
   }
 
-  /*
-   * The block index for a RFile.
-   * Used reading.
-   */
-  static class BlockIndex implements HeapSize {
-    // How many actual items are there? The next insert location too.
-    int count = 0;
-    byte [][] blockKeys;
-    long [] blockOffsets;
-    int [] blockDataSizes;
-    int size = 0;
-
-    /* Needed doing lookup on blocks.
-     */
-    final RawComparator<byte []> comparator;
-
-    /*
-     * Shutdown default constructor
-     */
-    @SuppressWarnings("unused")
-    private BlockIndex() {
-      this(null);
-    }
-
-
-    /**
-     * @param c comparator used to compare keys.
-     */
-    BlockIndex(final RawComparator<byte []>c) {
-      this.comparator = c;
-      // Guess that cost of three arrays + this object is 4 * 8 bytes.
-      this.size += (4 * 8);
-    }
-
-    /**
-     * @return True if block index is empty.
-     */
-    boolean isEmpty() {
-      return this.blockKeys.length <= 0;
-    }
-
-    /**
-     * Adds a new entry in the block index.
-     *
-     * @param key Last key in the block
-     * @param offset file offset where the block is stored
-     * @param dataSize the uncompressed data size
-     */
-    void add(final byte[] key, final long offset, final int dataSize) {
-      blockOffsets[count] = offset;
-      blockKeys[count] = key;
-      blockDataSizes[count] = dataSize;
-      count++;
-      this.size += (Bytes.SIZEOF_INT * 2 + key.length);
-    }
-
-    /**
-     * @param key Key to find
-     * @return Offset of block containing <code>key</code> or -1 if this file
-     * does not contain the request.
-     */
-    int blockContainingKey(final byte[] key, int offset, int length) {
-      int pos = Bytes.binarySearch(blockKeys, key, offset, length, this.comparator);
-      if (pos < 0) {
-        pos ++;
-        pos *= -1;
-        if (pos == 0) {
-          // falls before the beginning of the file.
-          return -1;
-        }
-        // When switched to "first key in block" index, binarySearch now returns
-        // the block with a firstKey < key.  This means the value we want is potentially
-        // in the next block.
-        pos --; // in previous block.
-
-        return pos;
-      }
-      // wow, a perfect hit, how unlikely?
-      return pos;
-    }
-
-    /*
-     * @return File midkey.  Inexact.  Operates on block boundaries.  Does
-     * not go into blocks.
-     */
-    byte [] midkey() throws IOException {
-      int pos = ((this.count - 1)/2);              // middle of the index
-      if (pos < 0) {
-        throw new IOException("HFile empty");
-      }
-      return this.blockKeys[pos];
-    }
-
-    /*
-     * Write out index. Whatever we write here must jibe with what
-     * BlockIndex#readIndex is expecting.  Make sure the two ends of the
-     * index serialization match.
-     * @param o
-     * @param keys
-     * @param offsets
-     * @param sizes
-     * @param c
-     * @return Position at which we entered the index.
-     * @throws IOException
-     */
-    static long writeIndex(final FSDataOutputStream o,
-      final List<byte []> keys, final List<Long> offsets,
-      final List<Integer> sizes)
-    throws IOException {
-      long pos = o.getPos();
-      // Don't write an index if nothing in the index.
-      if (keys.size() > 0) {
-        o.write(INDEXBLOCKMAGIC);
-        // Write the index.
-        for (int i = 0; i < keys.size(); ++i) {
-          o.writeLong(offsets.get(i).longValue());
-          o.writeInt(sizes.get(i).intValue());
-          byte [] key = keys.get(i);
-          Bytes.writeByteArray(o, key);
-        }
-      }
-      return pos;
-    }
-
-    /*
-     * Read in the index that is at <code>indexOffset</code>
-     * Must match what was written by writeIndex in the Writer.close.
-     * @param c Comparator to use.
-     * @param in
-     * @param indexSize
-     * @throws IOException
-     */
-    static BlockIndex readIndex(final RawComparator<byte []> c,
-        DataInputStream in, final int indexSize)
-    throws IOException {
-      BlockIndex bi = new BlockIndex(c);
-      bi.blockOffsets = new long[indexSize];
-      bi.blockKeys = new byte[indexSize][];
-      bi.blockDataSizes = new int[indexSize];
-      // If index size is zero, no index was written.
-      if (indexSize > 0) {
-        byte [] magic = new byte[INDEXBLOCKMAGIC.length];
-        in.readFully(magic);
-        if (!Arrays.equals(magic, INDEXBLOCKMAGIC)) {
-          throw new IOException("Index block magic is wrong: " +
-            Arrays.toString(magic));
-        }
-        for (int i = 0; i < indexSize; ++i ) {
-          long offset   = in.readLong();
-          int dataSize  = in.readInt();
-          byte [] key = Bytes.readByteArray(in);
-          bi.add(key, offset, dataSize);
-        }
-      }
-      return bi;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("size=" + count).append("\n");
-      for (int i = 0; i < count ; i++) {
-        sb.append("key=").append(KeyValue.keyToString(blockKeys[i])).
-          append("\n  offset=").append(blockOffsets[i]).
-          append(", dataSize=" + blockDataSizes[i]).
-          append("\n");
-      }
-      return sb.toString();
-    }
-
-    public long heapSize() {
-      long heapsize = ClassSize.align(ClassSize.OBJECT +
-          2 * Bytes.SIZEOF_INT + (3 + 1) * ClassSize.REFERENCE);
-      //Calculating the size of blockKeys
-      if(blockKeys != null) {
-        //Adding array + references overhead
-        heapsize += ClassSize.align(ClassSize.ARRAY +
-            blockKeys.length * ClassSize.REFERENCE);
-        //Adding bytes
-        for(byte [] bs : blockKeys) {
-          heapsize += ClassSize.align(ClassSize.ARRAY + bs.length);
-        }
-      }
-      if(blockOffsets != null) {
-        heapsize += ClassSize.align(ClassSize.ARRAY +
-            blockOffsets.length * Bytes.SIZEOF_LONG);
-      }
-      if(blockDataSizes != null) {
-        heapsize += ClassSize.align(ClassSize.ARRAY +
-            blockDataSizes.length * Bytes.SIZEOF_INT);
-      }
-
-      return ClassSize.align(heapsize);
-    }
+  public static Reader createReader(
+      FileSystem fs, Path path, BlockCache blockCache, boolean inMemory,
+      boolean evictOnClose) throws IOException {
+    return pickReaderVersion(path, fs.open(path),
+        fs.getFileStatus(path).getLen(), true, blockCache, inMemory,
+        evictOnClose);
+  }
 
+  public static Reader createReader(Path path, FSDataInputStream fsdis,
+      long size, BlockCache blockache, boolean inMemory, boolean evictOnClose)
+      throws IOException {
+    return pickReaderVersion(path, fsdis, size, false, blockache, inMemory,
+        evictOnClose);
   }
 
   /*
@@ -1825,23 +366,37 @@ public class HFile {
     static final byte [] COMPARATOR =
       Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
 
-    /*
-     * Constructor.
+    /**
+     * Append the given key/value pair to the file info, optionally checking the
+     * key prefix.
+     *
+     * @param k key to add
+     * @param v value to add
+     * @param checkPrefix whether to check that the provided key does not start
+     *          with the reserved prefix
+     * @return this file info object
+     * @throws IOException if the key or value is invalid
      */
-    FileInfo() {
-      super();
+    public FileInfo append(final byte[] k, final byte[] v,
+        final boolean checkPrefix) throws IOException {
+      if (k == null || v == null) {
+        throw new NullPointerException("Key nor value may be null");
+      }
+      if (checkPrefix && isReservedFileInfoKey(k)) {
+        throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
+            + " are reserved");
+      }
+      put(k, v);
+      return this;
     }
+
   }
 
-  /**
-   * Return true if the given file info key is reserved for internal
-   * use by HFile.
-   */
+  /** Return true if the given file info key is reserved for internal use. */
   public static boolean isReservedFileInfoKey(byte[] key) {
     return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
   }
 
-
   /**
    * Get names of supported compression algorithms. The names are acceptable by
    * HFile.Writer.
@@ -1879,7 +434,7 @@ public class HFile {
    * @throws IOException When scanning the files fails.
    */
   static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
-  throws IOException {
+      throws IOException {
     List<Path> res = new ArrayList<Path>();
     PathFilter dirFilter = new FSUtils.DirFilter(fs);
     FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
@@ -1894,171 +449,31 @@ public class HFile {
     return res;
   }
 
-  public static void main(String []args) throws IOException {
-    try {
-      // create options
-      Options options = new Options();
-      options.addOption("v", "verbose", false, "Verbose output; emits file and meta data delimiters");
-      options.addOption("p", "printkv", false, "Print key/value pairs");
-      options.addOption("e", "printkey", false, "Print keys");
-      options.addOption("m", "printmeta", false, "Print meta data of file");
-      options.addOption("b", "printblocks", false, "Print block index meta data");
-      options.addOption("k", "checkrow", false,
-        "Enable row order check; looks for out-of-order keys");
-      options.addOption("a", "checkfamily", false, "Enable family check");
-      options.addOption("f", "file", true,
-        "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
-      options.addOption("r", "region", true,
-        "Region to scan. Pass region name; e.g. '.META.,,1'");
-      if (args.length == 0) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp("HFile ", options, true);
-        System.exit(-1);
-      }
-      CommandLineParser parser = new PosixParser();
-      CommandLine cmd = parser.parse(options, args);
-      boolean verbose = cmd.hasOption("v");
-      boolean printValue = cmd.hasOption("p");
-      boolean printKey = cmd.hasOption("e") || printValue;
-      boolean printMeta = cmd.hasOption("m");
-      boolean printBlocks = cmd.hasOption("b");
-      boolean checkRow = cmd.hasOption("k");
-      boolean checkFamily = cmd.hasOption("a");
-      // get configuration, file system and get list of files
-      Configuration conf = HBaseConfiguration.create();
-      conf.set("fs.defaultFS",
-        conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
-      conf.set("fs.default.name",
-        conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
-      ArrayList<Path> files = new ArrayList<Path>();
-      if (cmd.hasOption("f")) {
-        files.add(new Path(cmd.getOptionValue("f")));
-      }
-      if (cmd.hasOption("r")) {
-        String regionName = cmd.getOptionValue("r");
-        byte[] rn = Bytes.toBytes(regionName);
-        byte[][] hri = HRegionInfo.parseRegionName(rn);
-        Path rootDir = FSUtils.getRootDir(conf);
-        Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
-        String enc = HRegionInfo.encodeRegionName(rn);
-        Path regionDir = new Path(tableDir, enc);
-        if (verbose) System.out.println("region dir -> " + regionDir);
-        List<Path> regionFiles =
-          getStoreFiles(FileSystem.get(conf), regionDir);
-        if (verbose) System.out.println("Number of region files found -> " +
-          regionFiles.size());
-        if (verbose) {
-          int i = 1;
-          for (Path p : regionFiles) {
-            if (verbose) System.out.println("Found file[" + i++ + "] -> " + p);
-          }
-        }
-        files.addAll(regionFiles);
-      }
-      // iterate over all files found
-      for (Path file : files) {
-        if (verbose) System.out.println("Scanning -> " + file);
-        FileSystem fs = file.getFileSystem(conf);
-        if (!fs.exists(file)) {
-          System.err.println("ERROR, file doesnt exist: " + file);
-          continue;
-        }
-        // create reader and load file info
-        HFile.Reader reader = new HFile.Reader(fs, file, null, false, false);
-        Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
-        int count = 0;
-        if (verbose || printKey || checkRow || checkFamily) {
-          // scan over file and read key/value's and check if requested
-          HFileScanner scanner = reader.getScanner(false, false);
-          scanner.seekTo();
-          KeyValue pkv = null;
-          do {
-            KeyValue kv = scanner.getKeyValue();
-            // dump key value
-           if (printKey) {
-             System.out.print("K: " + kv);
-             if (printValue) {
-               System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
-             }
-             System.out.println();
-            }
-            // check if rows are in order
-            if (checkRow && pkv != null) {
-              if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
-                System.err.println("WARNING, previous row is greater then" +
-                    " current row\n\tfilename -> " + file +
-                    "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
-                    "\n\tcurrent  -> " + Bytes.toStringBinary(kv.getKey()));
-              }
-            }
-            // check if families are consistent
-            if (checkFamily) {
-              String fam = Bytes.toString(kv.getFamily());
-              if (!file.toString().contains(fam)) {
-                System.err.println("WARNING, filename does not match kv family," +
-                    "\n\tfilename -> " + file +
-                    "\n\tkeyvalue -> " + Bytes.toStringBinary(kv.getKey()));
-              }
-              if (pkv != null && !Bytes.equals(pkv.getFamily(), kv.getFamily())) {
-                System.err.println("WARNING, previous kv has different family" +
-                    " compared to current key\n\tfilename -> " + file +
-                    "\n\tprevious -> " +  Bytes.toStringBinary(pkv.getKey()) +
-                    "\n\tcurrent  -> " + Bytes.toStringBinary(kv.getKey()));
-              }
-            }
-            pkv = kv;
-            count++;
-          } while (scanner.next());
-        }
-        if (verbose || printKey) {
-          System.out.println("Scanned kv count -> " + count);
-        }
-        // print meta data
-        if (printMeta) {
-          System.out.println("Block index size as per heapsize: " + reader.indexSize());
-          System.out.println(reader.toString());
-          System.out.println(reader.getTrailerInfo());
-          System.out.println("Fileinfo:");
-          for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
-            System.out.print(Bytes.toString(e.getKey()) + " = " );
-            if (Bytes.equals(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY"))) {
-              long seqid = Bytes.toLong(e.getValue());
-              System.out.println(seqid);
-            } else if (Bytes.equals(e.getKey(),
-                Bytes.toBytes("TIMERANGE"))) {
-              TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
-              Writables.copyWritable(e.getValue(), timeRangeTracker);
-              System.out.println(timeRangeTracker.getMinimumTimestamp() +
-                  "...." + timeRangeTracker.getMaximumTimestamp());
-            } else if (Bytes.equals(e.getKey(), FileInfo.AVG_KEY_LEN) ||
-                Bytes.equals(e.getKey(), FileInfo.AVG_VALUE_LEN)) {
-              System.out.println(Bytes.toInt(e.getValue()));
-            } else {
-              System.out.println(Bytes.toStringBinary(e.getValue()));
-            }
-          }
-
-          //Printing bloom information
-          ByteBuffer b = reader.getMetaBlock("BLOOM_FILTER_META", false);
-          if (b!= null) {
-            BloomFilter bloomFilter = new ByteBloomFilter(b);
-            System.out.println("BloomSize: " + bloomFilter.getByteSize());
-            System.out.println("No of Keys in bloom: " +
-                bloomFilter.getKeyCount());
-            System.out.println("Max Keys for bloom: " +
-                bloomFilter.getMaxKeys());
-          } else {
-            System.out.println("Could not get bloom data from meta block");
-          }
-        }
-        if (printBlocks) {
-          System.out.println("Block Index:");
-          System.out.println(reader.blockIndex);
-        }
-        reader.close();
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
+  public static void main(String[] args) throws IOException {
+    HFilePrettyPrinter prettyPrinter = new HFilePrettyPrinter();
+    System.exit(prettyPrinter.run(args));
+  }
+
+  public static String getBlockCacheKey(String hfileName, long offset) {
+    return hfileName + CACHE_KEY_SEPARATOR + offset;
+  }
+
+  /**
+   * Checks the given {@link HFile} format version, and throws an exception if
+   * invalid. Note that if the version number comes from an input file and has
+   * not been verified, the caller needs to re-throw an {@link IOException} to
+   * indicate that this is not a software error, but corrupted input.
+   *
+   * @param version an HFile version
+   * @throws IllegalArgumentException if the version is invalid
+   */
+  public static void checkFormatVersion(int version)
+      throws IllegalArgumentException {
+    if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
+      throw new IllegalArgumentException("Invalid HFile version: " + version
+          + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
+          + MAX_FORMAT_VERSION + ")");
     }
   }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1153634&r1=1153633&r2=1153634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Aug  3 19:59:48 2011
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.PriorityQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -251,7 +250,7 @@ public class LruBlockCache implements Bl
    * @param buf block buffer
    * @param inMemory if block is in-memory
    */
-  public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory) {
+  public void cacheBlock(String blockName, HeapSize buf, boolean inMemory) {
     CachedBlock cb = map.get(blockName);
     if(cb != null) {
       throw new RuntimeException("Cached an already cached block");
@@ -275,7 +274,7 @@ public class LruBlockCache implements Bl
    * @param blockName block name
    * @param buf block buffer
    */
-  public void cacheBlock(String blockName, ByteBuffer buf) {
+  public void cacheBlock(String blockName, HeapSize buf) {
     cacheBlock(blockName, buf, false);
   }
 
@@ -284,7 +283,7 @@ public class LruBlockCache implements Bl
    * @param blockName block name
    * @return buffer of specified block name, or null if not in cache
    */
-  public ByteBuffer getBlock(String blockName, boolean caching) {
+  public HeapSize getBlock(String blockName, boolean caching) {
     CachedBlock cb = map.get(blockName);
     if(cb == null) {
       stats.miss(caching);
@@ -304,6 +303,31 @@ public class LruBlockCache implements Bl
     return true;
   }
 
+  /**
+   * Evicts all blocks whose name starts with the given prefix. This is an
+   * expensive operation implemented as a linear-time search through all blocks
+   * in the cache. Ideally this should be a search in a log-access-time map.
+   *
+   * <p>
+   * This is used for evict-on-close to remove all blocks of a specific HFile.
+   * The prefix would be the HFile/StoreFile name (a UUID) followed by an
+   * underscore, because HFile v2 block names in cache are of the form
+   * "&lt;storeFileUUID&gt;_&lt;blockOffset&gt;".
+   *
+   * @return the number of blocks evicted
+   */
+  @Override
+  public int evictBlocksByPrefix(String prefix) {
+    int numEvicted = 0;
+    for (String key : map.keySet()) {
+      if (key.startsWith(prefix)) {
+        if (evictBlock(key))
+          ++numEvicted;
+      }
+    }
+    return numEvicted;
+  }
+
   protected long evictBlock(CachedBlock block) {
     map.remove(block.getName());
     size.addAndGet(-1 * block.heapSize());

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java?rev=1153634&r1=1153633&r2=1153634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java Wed Aug  3 19:59:48 2011
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
 
 
@@ -32,10 +33,10 @@ import org.apache.hadoop.hbase.io.hfile.
  * Simple one RFile soft reference cache.
  */
 public class SimpleBlockCache implements BlockCache {
-  private static class Ref extends SoftReference<ByteBuffer> {
+  private static class Ref extends SoftReference<HeapSize> {
     public String blockId;
-    public Ref(String blockId, ByteBuffer buf, ReferenceQueue q) {
-      super(buf, q);
+    public Ref(String blockId, HeapSize block, ReferenceQueue q) {
+      super(block, q);
       this.blockId = blockId;
     }
   }
@@ -68,7 +69,7 @@ public class SimpleBlockCache implements
     return cache.size();
   }
 
-  public synchronized ByteBuffer getBlock(String blockName, boolean caching) {
+  public synchronized HeapSize getBlock(String blockName, boolean caching) {
     processQueue(); // clear out some crap.
     Ref ref = cache.get(blockName);
     if (ref == null)
@@ -76,13 +77,13 @@ public class SimpleBlockCache implements
     return ref.get();
   }
 
-  public synchronized void cacheBlock(String blockName, ByteBuffer buf) {
-    cache.put(blockName, new Ref(blockName, buf, q));
+  public synchronized void cacheBlock(String blockName, HeapSize block) {
+    cache.put(blockName, new Ref(blockName, block, q));
   }
 
-  public synchronized void cacheBlock(String blockName, ByteBuffer buf,
+  public synchronized void cacheBlock(String blockName, HeapSize block,
       boolean inMemory) {
-    cache.put(blockName, new Ref(blockName, buf, q));
+    cache.put(blockName, new Ref(blockName, block, q));
   }
 
   @Override
@@ -117,4 +118,10 @@ public class SimpleBlockCache implements
     // TODO: implement this if we ever actually use this block cache
     return 0;
   }
+
+  @Override
+  public int evictBlocksByPrefix(String string) {
+    throw new UnsupportedOperationException();
+  }
 }
+

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1153634&r1=1153633&r2=1153634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed Aug  3 19:59:48 2011
@@ -79,7 +79,7 @@ public class HFileOutputFormat extends F
     // Get the path of the temporary output file
     final Path outputPath = FileOutputFormat.getOutputPath(context);
     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
-    Configuration conf = context.getConfiguration();
+    final Configuration conf = context.getConfiguration();
     final FileSystem fs = outputdir.getFileSystem(conf);
     // These configs. are from hbase-*.xml
     final long maxsize = conf.getLong("hbase.hregion.max.filesize",
@@ -132,7 +132,7 @@ public class HFileOutputFormat extends F
 
         // create a new HLog writer, if necessary
         if (wl == null || wl.writer == null) {
-          wl = getNewWriter(family);
+          wl = getNewWriter(family, conf);
         }
 
         // we now have the proper HLog writer. full steam ahead
@@ -163,12 +163,13 @@ public class HFileOutputFormat extends F
        * @return A WriterLength, containing a new HFile.Writer.
        * @throws IOException
        */
-      private WriterLength getNewWriter(byte[] family) throws IOException {
+      private WriterLength getNewWriter(byte[] family, Configuration conf)
+          throws IOException {
         WriterLength wl = new WriterLength();
         Path familydir = new Path(outputdir, Bytes.toString(family));
         String compression = compressionMap.get(family);
         compression = compression == null ? defaultCompression : compression;
-        wl.writer = new HFile.Writer(fs,
+        wl.writer = HFile.getWriterFactory(conf).createWriter(fs,
           StoreFile.getUniqueFile(fs, familydir), blocksize,
           compression, KeyValue.KEY_COMPARATOR);
         this.writers.put(family, wl);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1153634&r1=1153633&r2=1153634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Aug  3 19:59:48 2011
@@ -288,7 +288,7 @@ public class LoadIncrementalHFiles exten
   throws IOException {
     final Path hfilePath = item.hfilePath;
     final FileSystem fs = hfilePath.getFileSystem(getConf());
-    HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false, false);
+    HFile.Reader hfr = HFile.createReader(fs, hfilePath, null, false, false);
     final byte[] first, last;
     try {
       hfr.loadFileInfo();
@@ -390,7 +390,7 @@ public class LoadIncrementalHFiles exten
 
       halfWriter = new StoreFile.Writer(
           fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
-          bloomFilterType, 0, false);
+          bloomFilterType, 0);
       HFileScanner scanner = halfReader.getScanner(false, false);
       scanner.seekTo();
       do {
@@ -490,7 +490,7 @@ public class LoadIncrementalHFiles exten
       for (Path hfile : hfiles) {
         if (hfile.getName().startsWith("_")) continue;
         
-        HFile.Reader reader = new HFile.Reader(fs, hfile, null, false, false);
+        HFile.Reader reader = HFile.createReader(fs, hfile, null, false, false);
         final byte[] first, last;
         try {
           reader.loadFileInfo();



Mime
View raw message