hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1245291 [4/7] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/h...
Date Fri, 17 Feb 2012 01:56:35 GMT
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Fri Feb 17 01:56:33 2012
@@ -30,6 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -51,7 +53,7 @@ public class HFileReaderV2 extends Abstr
 
   private boolean includesMemstoreTS = false;
 
-  private boolean hasMemstoreTS() {
+  private boolean shouldIncludeMemstoreTS() {
     return includesMemstoreTS;
   }
 
@@ -80,17 +82,20 @@ public class HFileReaderV2 extends Abstr
    * @param size Length of the stream.
    * @param closeIStream Whether to close the stream.
    * @param cacheConf Cache configuration.
-   * @throws IOException
+   * @param preferredEncodingInCache the encoding to use in cache in case we
+   *          have a choice. If the file is already encoded on disk, we will
+   *          still use its on-disk encoding in cache.
    */
   public HFileReaderV2(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long size,
-      final boolean closeIStream, final CacheConfig cacheConf)
-  throws IOException {
+      final boolean closeIStream, final CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache)
+      throws IOException {
     super(path, trailer, fsdis, size, closeIStream, cacheConf);
-
     trailer.expectVersion(2);
-    fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
-        fileSize);
+    HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
+        compressAlgo, fileSize);
+    this.fsBlockReader = fsBlockReaderV2; // upcast
 
     // Comparator class name is stored in the trailer in version 2.
     comparator = trailer.createComparator();
@@ -101,7 +106,7 @@ public class HFileReaderV2 extends Abstr
 
     // Parse load-on-open data.
 
-    HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
+    HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
         trailer.getLoadOnOpenDataOffset(),
         fileSize - trailer.getTrailerSize());
 
@@ -122,9 +127,17 @@ public class HFileReaderV2 extends Abstr
     lastKey = fileInfo.get(FileInfo.LASTKEY);
     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
-    includesMemstoreTS = (keyValueFormatVersion != null &&
-       Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE_TS);
+    byte [] keyValueFormatVersion =
+        fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
+    includesMemstoreTS = keyValueFormatVersion != null &&
+        Bytes.toInt(keyValueFormatVersion) ==
+            HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE_TS;
+    fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
+
+    // Read data block encoding algorithm name from file info.
+    dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo,
+        preferredEncodingInCache);
+    fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
 
     // Store all other load-on-open blocks for further consumption.
     HFileBlock b;
@@ -145,8 +158,15 @@ public class HFileReaderV2 extends Abstr
    * @param isCompaction is scanner being used for a compaction?
    * @return Scanner on this file.
    */
+  @Override
   public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
       final boolean isCompaction) {
+    // check if we want to use data block encoding in memory
+    if (dataBlockEncoder.useEncodedScanner(isCompaction)) {
+      return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
+          includesMemstoreTS);
+    }
+
     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
   }
 
@@ -171,7 +191,7 @@ public class HFileReaderV2 extends Abstr
     if (block == -1)
       return null;
     long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
-    long now = System.currentTimeMillis();
+    long startTimeNs = System.nanoTime();
 
     // Per meta key from any given file, synchronize reads for said block. This
     // is OK to do for meta blocks because the meta block index is always
@@ -179,7 +199,8 @@ public class HFileReaderV2 extends Abstr
     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
       // Check cache for block. If found return.
       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
-      BlockCacheKey cacheKey = HFile.getBlockCacheKey(name, metaBlockOffset);
+      BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
+          DataBlockEncoding.NONE, BlockType.META);
 
       cacheBlock &= cacheConf.shouldCacheDataOnRead();
       if (cacheConf.isBlockCacheEnabled()) {
@@ -198,9 +219,9 @@ public class HFileReaderV2 extends Abstr
           blockSize, -1, true);
       passSchemaMetricsTo(metaBlock);
 
-      long delta = System.currentTimeMillis() - now;
-      HFile.readTime += delta;
-      HFile.readOps++;
+      long delta = System.nanoTime() - startTimeNs;
+      HFile.preadTimeNano.addAndGet(delta);
+      HFile.preadOps.incrementAndGet();
       getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
 
       // Cache the block
@@ -215,17 +236,22 @@ public class HFileReaderV2 extends Abstr
 
   /**
    * Read in a file block.
-   *
    * @param dataBlockOffset offset to read.
-   * @param onDiskSize size of the block
-   * @param pread Use positional read instead of seek+read (positional is better
-   *          doing random reads whereas seek+read is better scanning).
+   * @param onDiskBlockSize size of the block
+   * @param cacheBlock
+   * @param pread Use positional read instead of seek+read (positional is
+   *          better doing random reads whereas seek+read is better scanning).
    * @param isCompaction is this block being read as part of a compaction
+   * @param expectedBlockType the block type we are expecting to read with this
+   *          read operation, or null to read whatever block type is available
+   *          and avoid checking (that might reduce caching efficiency of
+   *          encoded data blocks)
    * @return Block wrapped in a ByteBuffer.
    * @throws IOException
    */
   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
-      boolean cacheBlock, boolean pread, final boolean isCompaction)
+      final boolean cacheBlock, boolean pread, final boolean isCompaction,
+      BlockType expectedBlockType)
       throws IOException {
     if (dataBlockIndexReader == null) {
       throw new IOException("Block index not loaded");
@@ -242,32 +268,59 @@ public class HFileReaderV2 extends Abstr
     // the other choice is to duplicate work (which the cache would prevent you
     // from doing).
 
-    BlockCacheKey cacheKey = HFile.getBlockCacheKey(name, dataBlockOffset);
+    BlockCacheKey cacheKey =
+        new BlockCacheKey(name, dataBlockOffset,
+            dataBlockEncoder.getEffectiveEncodingInCache(isCompaction),
+            expectedBlockType);
     IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
     try {
       // Check cache for block. If found return.
       if (cacheConf.isBlockCacheEnabled()) {
-        HFileBlock cachedBlock =
-          (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
+        HFileBlock cachedBlock = (HFileBlock)
+            cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
         if (cachedBlock != null) {
           BlockCategory blockCategory =
               cachedBlock.getBlockType().getCategory();
           getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
+
+          if (cachedBlock.getBlockType() == BlockType.DATA) {
+            HFile.dataBlockReadCnt.incrementAndGet();
+          }
+
+          validateBlockType(cachedBlock, expectedBlockType);
+
+          // Validate encoding type for encoded blocks. We include encoding
+          // type in the cache key, and we expect it to match on a cache hit.
+          if (cachedBlock.getBlockType() == BlockType.ENCODED_DATA &&
+              cachedBlock.getDataBlockEncoding() !=
+              dataBlockEncoder.getEncodingInCache()) {
+            throw new IOException("Cached block under key " + cacheKey + " " +
+                "has wrong encoding: " + cachedBlock.getDataBlockEncoding() +
+                " (expected: " + dataBlockEncoder.getEncodingInCache() + ")");
+          }
           return cachedBlock;
         }
         // Carry on, please load.
       }
 
       // Load block from filesystem.
-      long now = System.currentTimeMillis();
+      long startTimeNs = System.nanoTime();
       HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
           onDiskBlockSize, -1, pread);
+      hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock,
+          isCompaction);
+      validateBlockType(hfileBlock, expectedBlockType);
       passSchemaMetricsTo(hfileBlock);
       BlockCategory blockCategory = hfileBlock.getBlockType().getCategory();
 
-      long delta = System.currentTimeMillis() - now;
-      HFile.readTime += delta;
-      HFile.readOps++;
+      long delta = System.nanoTime() - startTimeNs;
+      if (pread) {
+        HFile.preadTimeNano.addAndGet(delta);
+        HFile.preadOps.incrementAndGet();
+      } else {
+        HFile.readTimeNano.addAndGet(delta);
+        HFile.readOps.incrementAndGet();
+      }
       getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
 
       // Cache the block if necessary
@@ -284,6 +337,33 @@ public class HFileReaderV2 extends Abstr
   }
 
   /**
+   * Compares the actual type of a block retrieved from cache or disk with its
+   * expected type and throws an exception in case of a mismatch. Expected
+   * block type of {@link BlockType#DATA} is considered to match the actual
+   * block type [@link {@link BlockType#ENCODED_DATA} as well.
+   * @param block a block retrieved from cache or disk
+   * @param expectedBlockType the expected block type, or null to skip the
+   *          check
+   */
+  private void validateBlockType(HFileBlock block,
+      BlockType expectedBlockType) throws IOException {
+    if (expectedBlockType == null) {
+      return;
+    }
+    BlockType actualBlockType = block.getBlockType();
+    if (actualBlockType == BlockType.ENCODED_DATA &&
+        expectedBlockType == BlockType.DATA) {
+      // We consider DATA to match ENCODED_DATA for the purpose of this
+      // verification.
+      return;
+    }
+    if (actualBlockType != expectedBlockType) {
+      throw new IOException("Expected block type " + expectedBlockType + ", " +
+          "but got " + actualBlockType + ": " + block);
+    }
+  }
+
+  /**
    * @return Last key in the file. May be null if file has no entries. Note that
    *         this is not the last row key, but rather the byte form of the last
    *         KeyValue.
@@ -323,32 +403,164 @@ public class HFileReaderV2 extends Abstr
     }
   }
 
+  protected abstract static class AbstractScannerV2
+      extends AbstractHFileReader.Scanner {
+    protected HFileBlock block;
+
+    public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(r, cacheBlocks, pread, isCompaction);
+    }
+
+    /**
+     * An internal API function. Seek to the given key, optionally rewinding to
+     * the first key of the block before doing the seek.
+     *
+     * @param key key byte array
+     * @param offset key offset in the key byte array
+     * @param length key length
+     * @param rewind whether to rewind to the first key of the block before
+     *        doing the seek. If this is false, we are assuming we never go
+     *        back, otherwise the result is undefined.
+     * @return -1 if the key is earlier than the first key of the file,
+     *         0 if we are at the given key, and 1 if we are past the given key
+     * @throws IOException
+     */
+    protected int seekTo(byte[] key, int offset, int length, boolean rewind)
+        throws IOException {
+      HFileBlockIndex.BlockIndexReader indexReader =
+          reader.getDataBlockIndexReader();
+      HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
+          block, cacheBlocks, pread, isCompaction);
+      if (seekToBlock == null) {
+        // This happens if the key e.g. falls before the beginning of the file.
+        return -1;
+      }
+      return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
+          false);
+    }
+
+    protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
+
+    protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock,
+        boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException;
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      // Always rewind to the first key of the block, because the given key
+      // might be before or after the current key.
+      return seekTo(key, offset, length, true);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length) throws IOException {
+      if (isSeeked()) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+
+      // Don't rewind on a reseek operation, because reseek implies that we are
+      // always going forward in the file.
+      return seekTo(key, offset, length, false);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+        throws IOException {
+      HFileBlock seekToBlock =
+          reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
+              block, cacheBlocks, pread, isCompaction);
+      if (seekToBlock == null) {
+        return false;
+      }
+      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+      if (reader.getComparator().compare(firstKey.array(),
+          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
+      {
+        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+        // The key we are interested in
+        if (previousBlockOffset == -1) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+
+        // It is important that we compute and pass onDiskSize to the block
+        // reader so that it does not have to read the header separately to
+        // figure out the size.
+        seekToBlock = reader.readBlock(previousBlockOffset,
+            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
+            pread, isCompaction, BlockType.DATA);
+
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+      return true;
+    }
+
+
+    /**
+     * Scans blocks in the "scanned" section of the {@link HFile} until the next
+     * data block is found.
+     *
+     * @return the next block, or null if there are no more data blocks
+     * @throws IOException
+     */
+    protected HFileBlock readNextDataBlock() throws IOException {
+      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+      if (block == null)
+        return null;
+
+      HFileBlock curBlock = block;
+
+      do {
+        if (curBlock.getOffset() >= lastDataBlockOffset)
+          return null;
+
+        if (curBlock.getOffset() < 0) {
+          throw new IOException("Invalid block file offset: " + block);
+        }
+
+        // We are reading the next block without block type validation, because
+        // it might turn out to be a non-data block.
+        curBlock = reader.readBlock(curBlock.getOffset()
+            + curBlock.getOnDiskSizeWithHeader(),
+            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+            isCompaction, null);
+      } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
+          curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
+
+      return curBlock;
+    }
+  }
+
   /**
    * Implementation of {@link HFileScanner} interface.
    */
-  protected static class ScannerV2 extends AbstractHFileReader.Scanner {
-    private HFileBlock block;
+  protected static class ScannerV2 extends AbstractScannerV2 {
     private HFileReaderV2 reader;
 
     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
-      super(cacheBlocks, pread, isCompaction);
+      super(r, cacheBlocks, pread, isCompaction);
       this.reader = r;
     }
 
     @Override
-    public HFileReaderV2 getReader() {
-      return reader;
-    }
-
-    @Override
     public KeyValue getKeyValue() {
       if (!isSeeked())
         return null;
 
-      KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position());
-      if (this.reader.hasMemstoreTS()) {
+      KeyValue ret = new KeyValue(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position());
+      if (this.reader.shouldIncludeMemstoreTS()) {
         ret.setMemstoreTS(currMemstoreTS);
       }
       return ret;
@@ -430,36 +642,6 @@ public class HFileReaderV2 extends Abstr
     }
 
     /**
-     * Scans blocks in the "scanned" section of the {@link HFile} until the next
-     * data block is found.
-     *
-     * @return the next block, or null if there are no more data blocks
-     * @throws IOException
-     */
-    private HFileBlock readNextDataBlock() throws IOException {
-      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
-      if (block == null)
-        return null;
-
-      HFileBlock curBlock = block;
-
-      do {
-        if (curBlock.getOffset() >= lastDataBlockOffset)
-          return null;
-
-        if (curBlock.getOffset() < 0) {
-          throw new IOException("Invalid block file offset: " + block);
-        }
-        curBlock = reader.readBlock(curBlock.getOffset()
-            + curBlock.getOnDiskSizeWithHeader(),
-            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
-            isCompaction);
-      } while (!curBlock.getBlockType().equals(BlockType.DATA));
-
-      return curBlock;
-    }
-
-    /**
      * Positions this scanner at the start of the file.
      *
      * @return false if empty file; i.e. a call to next would return false and
@@ -486,7 +668,7 @@ public class HFileReaderV2 extends Abstr
       }
 
       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
-          isCompaction);
+          isCompaction, BlockType.DATA);
       if (block.getOffset() < 0) {
         throw new IOException("Invalid block offset: " + block.getOffset());
       }
@@ -495,70 +677,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public int seekTo(byte[] key) throws IOException {
-      return seekTo(key, 0, key.length);
-    }
-
-    /**
-     * An internal API function. Seek to the given key, optionally rewinding to
-     * the first key of the block before doing the seek.
-     *
-     * @param key key byte array
-     * @param offset key offset in the key byte array
-     * @param length key length
-     * @param rewind whether to rewind to the first key of the block before
-     *        doing the seek. If this is false, we are assuming we never go
-     *        back, otherwise the result is undefined.
-     * @return -1 if the key is earlier than the first key of the file,
-     *         0 if we are at the given key, and 1 if we are past the given key
-     * @throws IOException
-     */
-    private int seekTo(byte[] key, int offset, int length, boolean rewind)
-        throws IOException {
-      HFileBlockIndex.BlockIndexReader indexReader =
-          reader.getDataBlockIndexReader();
-      HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
-          block, cacheBlocks, pread, isCompaction);
-
-      if (seekToBlock == null) {
-        // This happens if the key e.g. falls before the beginning of the file.
-        return -1;
-      }
-      return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
-          false);
-    }
-
-    @Override
-    public int seekTo(byte[] key, int offset, int length) throws IOException {
-      // Always rewind to the first key of the block, because the given key
-      // might be before or after the current key.
-      return seekTo(key, offset, length, true);
-    }
-
-    @Override
-    public int reseekTo(byte[] key) throws IOException {
-      return reseekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int reseekTo(byte[] key, int offset, int length) throws IOException {
-      if (isSeeked()) {
-        ByteBuffer bb = getKey();
-        int compared = reader.getComparator().compare(key, offset,
-            length, bb.array(), bb.arrayOffset(), bb.limit());
-        if (compared < 1) {
-          // If the required key is less than or equal to current key, then
-          // don't do anything.
-          return compared;
-        }
-      }
-
-      // Don't rewind on a reseek operation, because reseek implies that we are
-      // always going forward in the file.
-      return seekTo(key, offset, length, false);
-    }
-
-    private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
         byte[] key, int offset, int length, boolean seekBefore)
         throws IOException {
       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
@@ -577,6 +696,16 @@ public class HFileReaderV2 extends Abstr
      */
     private void updateCurrBlock(HFileBlock newBlock) {
       block = newBlock;
+
+      // sanity check
+      if (block.getBlockType() != BlockType.DATA) {
+        throw new IllegalStateException("ScannerV2 works only on data " +
+            "blocks, got " + block.getBlockType() + "; " +
+            "fileName=" + reader.name + ", " +
+            "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
+            "isCompaction=" + isCompaction);
+      }
+
       blockBuffer = block.getBufferWithoutHeader();
       readKeyValueLen();
       blockFetches++;
@@ -587,14 +716,16 @@ public class HFileReaderV2 extends Abstr
       currKeyLen = blockBuffer.getInt();
       currValueLen = blockBuffer.getInt();
       blockBuffer.reset();
-      if (this.reader.hasMemstoreTS()) {
+      if (this.reader.shouldIncludeMemstoreTS()) {
         try {
-          int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
-                                  + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
-          currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+          int memstoreTSOffset = blockBuffer.arrayOffset()
+              + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen
+              + currValueLen;
+          currMemstoreTS = Bytes.readVLong(blockBuffer.array(),
+              memstoreTSOffset);
           currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
         } catch (Exception e) {
-          throw new RuntimeException("Error reading memstoreTS. " + e);
+          throw new RuntimeException("Error reading memstore timestamp", e);
         }
       }
 
@@ -609,7 +740,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     /**
-     * Within a loaded block, seek looking for the first key that is smaller
+     * Within a loaded block, seek looking for the last key that is smaller
      * than (or equal to?) the key we are interested in.
      *
      * A note on the seekBefore: if you have seekBefore = true, AND the first
@@ -632,14 +763,15 @@ public class HFileReaderV2 extends Abstr
         klen = blockBuffer.getInt();
         vlen = blockBuffer.getInt();
         blockBuffer.reset();
-        if (this.reader.hasMemstoreTS()) {
+        if (this.reader.shouldIncludeMemstoreTS()) {
           try {
-            int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
-                                  + KEY_VALUE_LEN_SIZE + klen + vlen;
-            memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+            int memstoreTSOffset = blockBuffer.arrayOffset()
+                + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
+            memstoreTS = Bytes.readVLong(blockBuffer.array(),
+                memstoreTSOffset);
             memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
           } catch (Exception e) {
-            throw new RuntimeException("Error reading memstoreTS. " + e);
+            throw new RuntimeException("Error reading memstore timestamp", e);
           }
       }
 
@@ -663,7 +795,7 @@ public class HFileReaderV2 extends Abstr
           }
           currKeyLen = klen;
           currValueLen = vlen;
-          if (this.reader.hasMemstoreTS()) {
+          if (this.reader.shouldIncludeMemstoreTS()) {
             currMemstoreTS = memstoreTS;
             currMemstoreTSLen = memstoreTSLen;
           }
@@ -691,11 +823,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key) throws IOException {
-      return seekBefore(key, 0, key.length);
-    }
-
-    private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
       // It is safe to manipulate this buffer because we own the buffer object.
       buffer.rewind();
@@ -708,53 +836,174 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key, int offset, int length)
-        throws IOException {
-      HFileBlock seekToBlock =
-          reader.getDataBlockIndexReader().seekToDataBlock(key, offset,
-              length, block, cacheBlocks, pread, isCompaction);
-      if (seekToBlock == null) {
+    public String getKeyString() {
+      return Bytes.toStringBinary(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen);
+    }
+
+    @Override
+    public String getValueString() {
+      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+          currValueLen);
+    }
+  }
+
+  /**
+   * ScannerV2 that operates on encoded data blocks.
+   */
+  protected static class EncodedScannerV2 extends AbstractScannerV2 {
+    private DataBlockEncoder.EncodedSeeker seeker = null;
+    private DataBlockEncoder dataBlockEncoder = null;
+    private final boolean includesMemstoreTS;
+
+    public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
+        boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
+      super(reader, cacheBlocks, pread, isCompaction);
+      this.includesMemstoreTS = includesMemstoreTS;
+    }
+
+    private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
+      this.dataBlockEncoder = dataBlockEncoder;
+      seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
+          includesMemstoreTS);
+    }
+
+    /**
+     * Updates the current block to be the given {@link HFileBlock}. Seeks to
+     * the the first key/value pair.
+     *
+     * @param newBlock the block to make current
+     */
+    private void updateCurrentBlock(HFileBlock newBlock) {
+      block = newBlock;
+
+      // sanity checks
+      if (block.getBlockType() != BlockType.ENCODED_DATA) {
+        throw new IllegalStateException(
+            "EncodedScannerV2 works only on encoded data blocks");
+      }
+
+      short dataBlockEncoderId = block.getDataBlockEncodingId();
+      if (dataBlockEncoder == null ||
+          !DataBlockEncoding.isCorrectEncoder(dataBlockEncoder,
+              dataBlockEncoderId)) {
+        DataBlockEncoder encoder =
+            DataBlockEncoding.getDataBlockEncoderById(dataBlockEncoderId);
+        setDataBlockEncoder(encoder);
+      }
+
+      seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
+      blockFetches++;
+    }
+
+    private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
+      ByteBuffer origBlock = newBlock.getBufferReadOnly();
+      ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
+          origBlock.arrayOffset() + HFileBlock.HEADER_SIZE +
+          DataBlockEncoding.ID_SIZE,
+          origBlock.limit() - HFileBlock.HEADER_SIZE -
+          DataBlockEncoding.ID_SIZE).slice();
+      return encodedBlock;
+    }
+
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader == null) {
         return false;
       }
-      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
-      if (reader.getComparator().compare(firstKey.array(),
-          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
-      {
-        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
-        // The key we are interested in
-        if (previousBlockOffset == -1) {
-          // we have a 'problem', the key we want is the first of the file.
-          return false;
-        }
 
-        // It is important that we compute and pass onDiskSize to the block
-        // reader so that it does not have to read the header separately to
-        // figure out the size.
-        seekToBlock = reader.readBlock(previousBlockOffset,
-            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
-            pread, isCompaction);
+      if (reader.getTrailer().getEntryCount() == 0) {
+        // No data blocks.
+        return false;
+      }
 
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
+      long firstDataBlockOffset =
+          reader.getTrailer().getFirstDataBlockOffset();
+      if (block != null && block.getOffset() == firstDataBlockOffset) {
+        seeker.rewind();
+        return true;
       }
-      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+
+      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+          isCompaction, BlockType.DATA);
+      if (block.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + block.getOffset());
+      }
+      updateCurrentBlock(block);
       return true;
     }
 
     @Override
+    public boolean next() throws IOException {
+      boolean isValid = seeker.next();
+      if (!isValid) {
+        block = readNextDataBlock();
+        isValid = block != null;
+        if (isValid) {
+          updateCurrentBlock(block);
+        }
+      }
+      return isValid;
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      assertValidSeek();
+      return seeker.getKeyDeepCopy();
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      assertValidSeek();
+      return seeker.getValueShallowCopy();
+    }
+
+    @Override
+    public KeyValue getKeyValue() {
+      if (block == null) {
+        return null;
+      }
+      return seeker.getKeyValue();
+    }
+
+    @Override
     public String getKeyString() {
-      return Bytes.toStringBinary(blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position()
-              + KEY_VALUE_LEN_SIZE, currKeyLen);
+      ByteBuffer keyBuffer = getKey();
+      return Bytes.toStringBinary(keyBuffer.array(),
+          keyBuffer.arrayOffset(), keyBuffer.limit());
     }
 
     @Override
     public String getValueString() {
-      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
-          currValueLen);
+      ByteBuffer valueBuffer = getValue();
+      return Bytes.toStringBinary(valueBuffer.array(),
+          valueBuffer.arrayOffset(), valueBuffer.limit());
     }
 
+    private void assertValidSeek() {
+      if (block == null) {
+        throw new NotSeekedException();
+      }
+    }
+
+    @Override
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+      return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
+    }
+
+    @Override
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+        byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException  {
+      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+        updateCurrentBlock(seekToBlock);
+      } else if (rewind) {
+        seeker.rewind();
+      }
+      return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
+    }
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Fri Feb 17 01:56:33 2012
@@ -36,15 +36,18 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.Compressor;
 
 /**
- * Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
+ * Writes version 1 HFiles. Mainly used for testing backwards-compatibility.
  */
 public class HFileWriterV1 extends AbstractHFileWriter {
 
@@ -91,20 +94,23 @@ public class HFileWriterV1 extends Abstr
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        int bytesPerChecksum, Compression.Algorithm compressAlgo,
+        int bytesPerChecksum, Algorithm compressAlgo,
         final KeyComparator comparator)
         throws IOException {
       return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
-          bytesPerChecksum, compressAlgo, comparator);
+          bytesPerChecksum, compressAlgo, NoOpDataBlockEncoder.INSTANCE,
+          comparator);
     }
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
         int bytesPerChecksum, Compression.Algorithm compressAlgo,
+        HFileDataBlockEncoder dataBlockEncoder,
         final KeyComparator comparator, InetSocketAddress[] favoredNodes)
         throws IOException {
       return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
-          bytesPerChecksum, compressAlgo, comparator, favoredNodes);
+          bytesPerChecksum, compressAlgo, dataBlockEncoder,
+          comparator, favoredNodes);
     }
 
     @Override
@@ -128,7 +134,7 @@ public class HFileWriterV1 extends Abstr
         final int blockSize, final Compression.Algorithm compress,
         final KeyComparator c) throws IOException {
       return new HFileWriterV1(conf, cacheConf, ostream, blockSize, compress,
-          c);
+          NoOpDataBlockEncoder.INSTANCE, c);
     }
   }
 
@@ -138,7 +144,7 @@ public class HFileWriterV1 extends Abstr
       throws IOException {
     this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
         HFile.DEFAULT_BYTES_PER_CHECKSUM, HFile.DEFAULT_COMPRESSION_ALGORITHM,
-        null);
+        NoOpDataBlockEncoder.INSTANCE, null);
   }
 
   /**
@@ -149,26 +155,31 @@ public class HFileWriterV1 extends Abstr
       Path path, int blockSize, int bytesPerChecksum, String compressAlgoName,
       final KeyComparator comparator) throws IOException {
     this(conf, cacheConf, fs, path, blockSize, bytesPerChecksum,
-        compressionByName(compressAlgoName), comparator);
+        compressionByName(compressAlgoName), NoOpDataBlockEncoder.INSTANCE,
+        comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
   public HFileWriterV1(Configuration conf, CacheConfig cacheConf, FileSystem fs,
       Path path, int blockSize, int bytesPerChecksum,
-      Compression.Algorithm compress, final KeyComparator comparator)
+      Compression.Algorithm compress,
+      HFileDataBlockEncoder blockEncoder,
+      final KeyComparator comparator)
   throws IOException {
     super(conf, cacheConf, createOutputStream(conf, fs, path, bytesPerChecksum),
-        path, blockSize, compress, comparator);
+        path, blockSize, compress, blockEncoder, comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
   public HFileWriterV1(Configuration conf,
       CacheConfig cacheConf, FileSystem fs, Path path,
       int blockSize, int bytesPerChecksum, Compression.Algorithm compress,
+      HFileDataBlockEncoder dataBlockEncoder,
       final KeyComparator comparator, InetSocketAddress[] favoredNodes)
       throws IOException {
     super(conf, cacheConf, createOutputStream(conf, fs, path,
-        bytesPerChecksum, favoredNodes), path, blockSize, compress, comparator);
+        bytesPerChecksum, favoredNodes), path, blockSize, compress,
+        dataBlockEncoder, comparator);
   }
 
   /** Constructor that takes a stream. */
@@ -178,15 +189,17 @@ public class HFileWriterV1 extends Abstr
       throws IOException {
     this(conf, cacheConf, outputStream, blockSize,
         Compression.getCompressionAlgorithmByName(compressAlgoName),
-        comparator);
+        NoOpDataBlockEncoder.INSTANCE, comparator);
   }
 
   /** Constructor that takes a stream. */
   public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
       final FSDataOutputStream outputStream, final int blockSize,
-      final Compression.Algorithm compress, final KeyComparator comparator)
+      final Compression.Algorithm compress,
+      HFileDataBlockEncoder blockEncoder, final KeyComparator comparator)
       throws IOException {
-    super(conf, cacheConf, outputStream, null, blockSize, compress, comparator);
+    super(conf, cacheConf, outputStream, null, blockSize, compress,
+        blockEncoder, comparator);
   }
 
   /**
@@ -209,7 +222,7 @@ public class HFileWriterV1 extends Abstr
   private void finishBlock() throws IOException {
     if (this.out == null)
       return;
-    long now = System.currentTimeMillis();
+    long startTimeNs = System.nanoTime();
 
     int size = releaseCompressingStream(this.out);
     this.out = null;
@@ -218,18 +231,22 @@ public class HFileWriterV1 extends Abstr
     blockDataSizes.add(Integer.valueOf(size));
     this.totalUncompressedBytes += size;
 
-    HFile.writeTime += System.currentTimeMillis() - now;
-    HFile.writeOps++;
+    HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
+    HFile.writeOps.incrementAndGet();
 
     if (cacheConf.shouldCacheDataOnWrite()) {
       baosDos.flush();
+      // we do not do data block encoding on disk for HFile v1
       byte[] bytes = baos.toByteArray();
-      HFileBlock cBlock = new HFileBlock(BlockType.DATA,
+      HFileBlock block = new HFileBlock(BlockType.DATA,
           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
-          ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
-      passSchemaMetricsTo(cBlock);
+          ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
+          blockBegin, MemStore.NO_PERSISTENT_TS);
+      block = blockEncoder.diskToCacheFormat(block, false);
+      passSchemaMetricsTo(block);
       cacheConf.getBlockCache().cacheBlock(
-          HFile.getBlockCacheKey(name, blockBegin),cBlock);
+          new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE,
+              block.getBlockType()), block);
       baosDos.close();
     }
     blockNumber++;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Fri Feb 17 01:56:33 2012
@@ -48,12 +48,12 @@ import org.apache.hadoop.io.WritableUtil
 public class HFileWriterV2 extends AbstractHFileWriter {
   static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
 
-  /** Max memstore (rwcc) timestamp in FileInfo */
-  public static final byte[] MAX_MEMSTORE_TS_KEY =
+  /** Max memstore (mvcc) timestamp in FileInfo */
+  public static final byte [] MAX_MEMSTORE_TS_KEY =
       Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
 
   /** KeyValue version in FileInfo */
-  public static final byte[] KEY_VALUE_VERSION =
+  public static final byte [] KEY_VALUE_VERSION =
       Bytes.toBytes("KEY_VALUE_VERSION");
 
   /** Version for KeyValue which includes memstore timestamp */
@@ -97,18 +97,21 @@ public class HFileWriterV2 extends Abstr
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
         int bytesPerChecksum, Compression.Algorithm compress,
-        final KeyComparator comparator) throws IOException {
+        final KeyComparator comparator)
+        throws IOException {
       return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
-          bytesPerChecksum, compress, comparator);
+          bytesPerChecksum, compress, NoOpDataBlockEncoder.INSTANCE,
+          comparator);
     }
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
         int bytesPerChecksum, Compression.Algorithm compress,
+        HFileDataBlockEncoder blockEncoder,
         final KeyComparator comparator, InetSocketAddress[] favoredNodes)
         throws IOException {
       return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
-          bytesPerChecksum, compress, comparator, favoredNodes);
+          bytesPerChecksum, compress, blockEncoder, comparator, favoredNodes);
     }
 
     @Override
@@ -116,7 +119,8 @@ public class HFileWriterV2 extends Abstr
         int bytesPerChecksum, String compress, final KeyComparator comparator)
         throws IOException {
       return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
-          bytesPerChecksum, compress, comparator);
+          bytesPerChecksum, compress, NoOpDataBlockEncoder.INSTANCE,
+          comparator);
     }
 
     @Override
@@ -142,7 +146,7 @@ public class HFileWriterV2 extends Abstr
       throws IOException {
     this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
         HFile.DEFAULT_BYTES_PER_CHECKSUM, HFile.DEFAULT_COMPRESSION_ALGORITHM,
-        null);
+        NoOpDataBlockEncoder.INSTANCE, null);
   }
 
   /**
@@ -151,28 +155,35 @@ public class HFileWriterV2 extends Abstr
    */
   public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs,
       Path path, int blockSize, int bytesPerChecksum, String compressAlgoName,
+      HFileDataBlockEncoder blockEncoder,
       final KeyComparator comparator) throws IOException {
     this(conf, cacheConf, fs, path, blockSize, bytesPerChecksum,
-        compressionByName(compressAlgoName), comparator);
+        compressionByName(compressAlgoName), blockEncoder,
+        comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
   public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs,
       Path path, int blockSize, int bytesPerChecksum,
-      Compression.Algorithm compressAlgo, final KeyComparator comparator)
+      Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
+      final KeyComparator comparator)
   throws IOException {
     super(conf, cacheConf, createOutputStream(conf, fs, path,
-        bytesPerChecksum), path, blockSize, compressAlgo, comparator);
+        bytesPerChecksum), path, blockSize, compressAlgo,
+        blockEncoder,
+        comparator);
     finishInit(conf);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
       FileSystem fs, Path path, int blockSize, int bytesPerChecksum,
-      Compression.Algorithm compressAlgo, final KeyComparator comparator,
+      Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
+      final KeyComparator comparator,
       InetSocketAddress[] favoredNodes) throws IOException {
     super(conf, cacheConf, createOutputStream(conf, fs, path, bytesPerChecksum,
-        favoredNodes), path, blockSize, compressAlgo, comparator);
+        favoredNodes), path, blockSize, compressAlgo,
+        blockEncoder, comparator);
     finishInit(conf);
   }
 
@@ -191,7 +202,8 @@ public class HFileWriterV2 extends Abstr
       final FSDataOutputStream outputStream, final int blockSize,
       final Compression.Algorithm compress, final KeyComparator comparator)
       throws IOException {
-    super(conf, cacheConf, outputStream, null, blockSize, compress, comparator);
+    super(conf, cacheConf, outputStream, null, blockSize, compress, null,
+        comparator);
     finishInit(conf);
   }
 
@@ -201,7 +213,8 @@ public class HFileWriterV2 extends Abstr
       throw new IllegalStateException("finishInit called twice");
 
     // HFile filesystem-level (non-caching) block writer
-    fsBlockWriter = new HFileBlock.Writer(compressAlgo);
+    fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
+        includeMemstoreTS);
 
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
@@ -237,11 +250,12 @@ public class HFileWriterV2 extends Abstr
     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
       return;
 
-    long now = System.currentTimeMillis();
+    long startTimeNs = System.nanoTime();
 
     // Update the first data block offset for scanning.
-    if (firstDataBlockOffset == -1)
+    if (firstDataBlockOffset == -1) {
       firstDataBlockOffset = outputStream.getPos();
+    }
 
     // Update the last data block offset
     lastDataBlockOffset = outputStream.getPos();
@@ -253,8 +267,8 @@ public class HFileWriterV2 extends Abstr
         onDiskSize);
     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
 
-    HFile.writeTime += System.currentTimeMillis() - now;
-    HFile.writeOps++;
+    HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
+    HFile.writeOps.incrementAndGet();
 
     if (cacheConf.shouldCacheDataOnWrite()) {
       doCacheOnWrite(lastDataBlockOffset);
@@ -268,7 +282,7 @@ public class HFileWriterV2 extends Abstr
         long offset = outputStream.getPos();
         boolean cacheThisBlock = ibw.cacheOnWrite();
         ibw.writeInlineBlock(fsBlockWriter.startWriting(
-            ibw.getInlineBlockType(), cacheThisBlock));
+            ibw.getInlineBlockType()));
         fsBlockWriter.writeHeaderAndData(outputStream);
         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
             fsBlockWriter.getUncompressedSizeWithoutHeader());
@@ -286,11 +300,15 @@ public class HFileWriterV2 extends Abstr
    *          the cache key.
    */
   private void doCacheOnWrite(long offset) {
-    // Cache this block on write.
-    HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
+    // We don't cache-on-write data blocks on compaction, so assume this is not
+    // a compaction.
+    final boolean isCompaction = false;
+    HFileBlock cacheFormatBlock = blockEncoder.diskToCacheFormat(
+        fsBlockWriter.getBlockForCaching(), isCompaction);
     passSchemaMetricsTo(cacheFormatBlock);
     cacheConf.getBlockCache().cacheBlock(
-        HFile.getBlockCacheKey(name, offset), cacheFormatBlock);
+        new BlockCacheKey(name, offset, blockEncoder.getEncodingInCache(),
+            cacheFormatBlock.getBlockType()), cacheFormatBlock);
   }
 
   /**
@@ -300,8 +318,7 @@ public class HFileWriterV2 extends Abstr
    */
   private void newBlock() throws IOException {
     // This is where the next block begins.
-    fsBlockWriter.startWriting(BlockType.DATA,
-        cacheConf.shouldCacheDataOnWrite());
+    fsBlockWriter.startWriting(BlockType.DATA);
     firstKeyInBlock = null;
   }
 
@@ -433,8 +450,7 @@ public class HFileWriterV2 extends Abstr
         // store the beginning offset
         long offset = outputStream.getPos();
         // write the metadata content
-        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META,
-            cacheConf.shouldCacheDataOnWrite());
+        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
         metaData.get(i).write(dos);
 
         fsBlockWriter.writeHeaderAndData(outputStream);
@@ -459,17 +475,17 @@ public class HFileWriterV2 extends Abstr
 
     // Meta block index.
     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
-        BlockType.ROOT_INDEX, false), "meta");
+        BlockType.ROOT_INDEX), "meta");
     fsBlockWriter.writeHeaderAndData(outputStream);
 
     if (this.includeMemstoreTS) {
       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
-      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE_TS));
+      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(
+          KEY_VALUE_VER_WITH_MEMSTORE_TS));
     }
 
     // File info
-    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
-        false));
+    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
     fsBlockWriter.writeHeaderAndData(outputStream);
 
     // Load-on-open data supplied by higher levels, e.g. Bloom filters.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Fri Feb 17 01:56:33 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -899,4 +900,16 @@ public class LruBlockCache implements Bl
     }
     return counts;
   }
+
+  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
+    Map<DataBlockEncoding, Integer> counts =
+        new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
+    for (BlockCacheKey cacheKey : map.keySet()) {
+      DataBlockEncoding encoding = cacheKey.getDataBlockEncoding();
+      Integer count = counts.get(encoding);
+      counts.put(encoding, (count == null ? 0 : count) + 1);
+    }
+    return counts;
+  }
+
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1245291&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Fri Feb 17 01:56:33 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Does not perform any kind of encoding/decoding.
+ */
+public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
+
+  public static final NoOpDataBlockEncoder INSTANCE =
+      new NoOpDataBlockEncoder();
+
+  /** Cannot be instantiated. Use {@link #INSTANCE} instead. */
+  private NoOpDataBlockEncoder() {
+  }
+
+  @Override
+  public HFileBlock diskToCacheFormat(HFileBlock block, boolean isCompaction) {
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      throw new IllegalStateException("Unexpected encoded block");
+    }
+    return block;
+  }
+
+  @Override
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+      ByteBuffer in, boolean includesMemstoreTS) {
+    return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+  }
+
+  @Override
+  public boolean useEncodedScanner(boolean isCompaction) {
+    return false;
+  }
+
+  @Override
+  public void saveMetadata(StoreFile.Writer storeFileWriter) {
+  }
+
+  @Override
+  public DataBlockEncoding getEncodingOnDisk() {
+    return DataBlockEncoding.NONE;
+  }
+
+  @Override
+  public DataBlockEncoding getEncodingInCache() {
+    return DataBlockEncoding.NONE;
+  }
+
+  @Override
+  public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
+    return DataBlockEncoding.NONE;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Feb 17 01:56:33 2012
@@ -43,10 +43,13 @@ import org.apache.hadoop.hbase.client.Se
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -269,9 +272,12 @@ public class LoadIncrementalHFiles exten
     CacheConfig cacheConf = new CacheConfig(conf);
     HalfStoreFileReader halfReader = null;
     StoreFile.Writer halfWriter = null;
+    HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
+        familyDescriptor.getDataBlockEncodingOnDisk(),
+        familyDescriptor.getDataBlockEncoding());
     try {
       halfReader = new HalfStoreFileReader(fs, inFile, cacheConf,
-          reference);
+          reference, DataBlockEncoding.NONE);
       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
 
       int blocksize = familyDescriptor.getBlocksize();
@@ -280,7 +286,8 @@ public class LoadIncrementalHFiles exten
 
       float err = familyDescriptor.getBloomFilterErrorRate();
       halfWriter = new StoreFile.Writer(
-          fs, outFile, blocksize, compression, conf, cacheConf,
+          fs, outFile, blocksize, compression, dataBlockEncoder,
+          conf, cacheConf,
           KeyValue.COMPARATOR, bloomFilterType, err, 0, null);
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
@@ -304,7 +311,6 @@ public class LoadIncrementalHFiles exten
     return !HFile.isReservedFileInfoKey(key);
   }
 
-
   @Override
   public int run(String[] args) throws Exception {
     if (args.length != 2) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Fri Feb 17 01:56:33 2012
@@ -19,20 +19,19 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Preconditions;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Compact region on request and then run split if appropriate
  */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Feb 17 01:56:33 2012
@@ -466,8 +466,8 @@ public class HRegion implements HeapSize
 
     long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize();
     if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
-      flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
-                      HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
     }
     this.memstoreFlushSize = flushSize;
     this.blockingMemStoreSize = this.memstoreFlushSize *

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Feb 17 01:56:33 2012
@@ -677,6 +677,9 @@ public class MemStore implements HeapSiz
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
       (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 
+  /** Used for readability when we don't store memstore timestamp in HFile */
+  public static final boolean NO_PERSISTENT_TS = false;
+
   /*
    * Calculate how the MemStore size has changed.  Includes overhead of the
    * backing Map.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Feb 17 01:56:33 2012
@@ -55,13 +55,17 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -144,6 +148,8 @@ public class Store extends SchemaConfigu
   private final boolean blockcache;
   private final Compression.Algorithm compression;
 
+  private HFileDataBlockEncoder dataBlockEncoder;
+
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
@@ -177,6 +183,11 @@ public class Store extends SchemaConfigu
     this.blockcache = family.isBlockCacheEnabled();
     this.blocksize = family.getBlocksize();
     this.compression = family.getCompression();
+
+    this.dataBlockEncoder =
+        new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
+            family.getDataBlockEncoding());
+
     this.comparator = info.getComparator();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
@@ -279,6 +290,21 @@ public class Store extends SchemaConfigu
   }
 
   /**
+   * @return the data block encoder
+   */
+  public HFileDataBlockEncoder getDataBlockEncoder() {
+    return dataBlockEncoder;
+  }
+
+  /**
+   * Should be used only in tests.
+   * @param blockEncoder the block delta encoder to use
+   */
+  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
+    this.dataBlockEncoder = blockEncoder;
+  }
+
+  /**
    * Creates an unsorted list of StoreFile loaded in parallel
    * from the given directory.
    * @throws IOException
@@ -315,7 +341,8 @@ public class Store extends SchemaConfigu
       completionService.submit(new Callable<StoreFile>() {
         public StoreFile call() throws IOException {
           StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
-              family.getBloomFilterType());
+              family.getBloomFilterType(), dataBlockEncoder);
+          passSchemaMetricsTo(storeFile);
           storeFile.createReader();
           return storeFile;
         }
@@ -431,7 +458,9 @@ public class Store extends SchemaConfigu
     StoreFile.rename(fs, srcPath, dstPath);
 
     StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
-        this.family.getBloomFilterType());
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
+    passSchemaMetricsTo(sf);
+
     sf.createReader();
 
     LOG.info("Moved hfile " + srcPath + " into store directory " +
@@ -561,7 +590,6 @@ public class Store extends SchemaConfigu
       TimeRangeTracker snapshotTimeRangeTracker,
       MonitoredTask status) throws IOException {
     StoreFile.Writer writer;
-    String fileName;
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = region.getSmallestReadPoint();
     long flushed = 0;
@@ -578,6 +606,7 @@ public class Store extends SchemaConfigu
         MemStore.getSnapshotScanners(snapshot, this.comparator),
         this.region.getSmallestReadPoint(), true);
 
+    String fileName;
     try {
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
@@ -633,7 +662,9 @@ public class Store extends SchemaConfigu
     fs.rename(writer.getPath(), dstPath);
 
     StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
-        this.family.getBloomFilterType());
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
+    passSchemaMetricsTo(sf);
+
     StoreFile.Reader r = sf.createReader();
     this.storeSize += r.length();
     // This increments the metrics associated with total flushed bytes for this
@@ -679,8 +710,9 @@ public class Store extends SchemaConfigu
       writerCacheConf = cacheConf;
     }
     StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
-        blocksize, compression, comparator, conf, writerCacheConf,
-        family.getBloomFilterType(), maxKeyCount);
+        blocksize, compression, dataBlockEncoder, comparator, conf,
+        writerCacheConf, family.getBloomFilterType(),
+        BloomFilterFactory.getErrorRate(conf), maxKeyCount, null);
     // The store file writer's path does not include the CF name, so we need
     // to configure the HFile writer directly.
     SchemaConfigured sc = (SchemaConfigured) w.writer;
@@ -1283,11 +1315,13 @@ public class Store extends SchemaConfigu
         long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
             ? r.getFilterEntries() : r.getEntries();
         maxKeyCount += keyCount;
-        LOG.info("Compacting: " + file +
-          "; keyCount = " + keyCount +
-          "; Bloom Type = " + r.getBloomFilterType().toString() +
-          "; Size = " + StringUtils.humanReadableInt(r.length()) +
-          "; HFile v" + r.getHFileVersion());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Compacting " + file +
+            ", keycount=" + keyCount +
+            ", bloomtype=" + r.getBloomFilterType().toString() +
+            ", size=" + StringUtils.humanReadableInt(r.length()) +
+            ", encoding=" + r.getHFileReader().getEncodingOnDisk());
+        }
       }
     }
     LOG.info("Estimated total keyCount for output of compaction = " + maxKeyCount);
@@ -1372,8 +1406,10 @@ public class Store extends SchemaConfigu
       throws IOException {
     StoreFile storeFile = null;
     try {
-      storeFile = new StoreFile(this.fs, path, this.conf, this.cacheConf,
-          this.family.getBloomFilterType());
+      storeFile = new StoreFile(this.fs, path, this.conf,
+          this.cacheConf, this.family.getBloomFilterType(),
+          NoOpDataBlockEncoder.INSTANCE);
+      passSchemaMetricsTo(storeFile);
       storeFile.createReader();
     } catch (IOException e) {
       LOG.error("Failed to open store file : " + path
@@ -1413,18 +1449,19 @@ public class Store extends SchemaConfigu
     StoreFile result = null;
     if (compactedFile != null) {
       Path origPath = compactedFile.getPath();
-      Path dstPath = new Path(homedir, origPath.getName());
+      Path destPath = new Path(homedir, origPath.getName());
 
       validateStoreFile(origPath);
 
       // Move file into the right spot
-      LOG.info("Renaming compacted file at " + origPath + " to " + dstPath);
-      if (!fs.rename(origPath, dstPath)) {
+      LOG.info("Renaming compacted file at " + origPath + " to " + destPath);
+      if (!fs.rename(origPath, destPath)) {
         LOG.error("Failed move of compacted file " + origPath + " to " +
-            dstPath);
+            destPath);
       }
-      result = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
-          this.family.getBloomFilterType());
+      result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
+          this.family.getBloomFilterType(), this.dataBlockEncoder);
+      passSchemaMetricsTo(result);
       result.createReader();
     }
     try {
@@ -1515,7 +1552,7 @@ public class Store extends SchemaConfigu
 
   /**
    * Find the key that matches <i>row</i> exactly, or the one that immediately
-   * preceeds it. WARNING: Only use this method on a table where writes occur
+   * precedes it. WARNING: Only use this method on a table where writes occur
    * with strictly increasing timestamps. This method assumes this pattern of
    * writes in order to make it reasonably performant.  Also our search is
    * dependent on the axiom that deletes are for cells that are in the container
@@ -1960,9 +1997,10 @@ public class Store extends SchemaConfigu
     return this.cacheConf;
   }
 
-  public static final long FIXED_OVERHEAD = ClassSize.align(
-      new SchemaConfigured().heapSize() + (15 * ClassSize.REFERENCE) +
-      (7 * Bytes.SIZEOF_LONG) + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
+  public static final long FIXED_OVERHEAD =
+      ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+          + (16 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+          + (6 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Feb 17 01:56:33 2012
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.KeyValue.
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
@@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -80,7 +84,7 @@ import com.google.common.collect.Orderin
  * The reason for this weird pattern where you use a different instance for the
  * writer and a reader is that we write once but read a lot more.
  */
-public class StoreFile {
+public class StoreFile extends SchemaConfigured {
   static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
   public static enum BloomType {
@@ -121,6 +125,10 @@ public class StoreFile {
   /** Key for Timerange information in metadata*/
   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
 
+  /** Type of encoding used for data blocks in HFile. Stored in file info. */
+  public static final byte[] DATA_BLOCK_ENCODING =
+      Bytes.toBytes("DATA_BLOCK_ENCODING");
+
   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
   // Need to make it 8k for testing.
   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@@ -139,6 +147,9 @@ public class StoreFile {
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
+  // What kind of data block encoding will be used
+  private final HFileDataBlockEncoder dataBlockEncoder;
+
   // Keys for metadata stored in backing HFile.
   // Set when we obtain a Reader.
   private long sequenceid = -1;
@@ -206,18 +217,23 @@ public class StoreFile {
    *          as the Bloom filter type actually present in the HFile, because
    *          column family configuration might change. If this is
    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
+   * @param dataBlockEncoder data block encoding algorithm.
    * @throws IOException When opening the reader fails.
    */
   StoreFile(final FileSystem fs,
             final Path p,
             final Configuration conf,
             final CacheConfig cacheConf,
-            final BloomType cfBloomType)
+            final BloomType cfBloomType,
+            final HFileDataBlockEncoder dataBlockEncoder)
       throws IOException {
     this.conf = conf;
     this.fs = fs;
     this.path = p;
     this.cacheConf = cacheConf;
+    this.dataBlockEncoder =
+        dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
+            : dataBlockEncoder;
     if (isReference(p)) {
       this.reference = Reference.read(fs, p);
       this.referencePath = getReferredToFile(this.path);
@@ -400,10 +416,12 @@ public class StoreFile {
 
     if (isReference()) {
       this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
-          this.cacheConf, this.reference);
+          this.cacheConf, this.reference,
+          dataBlockEncoder.getEncodingInCache());
     } else {
       SchemaMetrics.configureGlobally(conf);
-      this.reader = new Reader(this.fs, this.path, this.cacheConf);
+      this.reader = new Reader(this.fs, this.path, this.cacheConf,
+          dataBlockEncoder.getEncodingInCache());
     }
 
     // Load up indices and fileinfo. This also loads Bloom filter type.
@@ -492,6 +510,10 @@ public class StoreFile {
     return this.reader;
   }
 
+  /**
+   * @param evictOnClose whether to evict blocks belonging to this file
+   * @throws IOException
+   */
   public synchronized void closeReader(boolean evictOnClose)
       throws IOException {
     if (this.reader != null) {
@@ -568,8 +590,9 @@ public class StoreFile {
   public static Writer createWriter(final FileSystem fs, final Path dir,
       final int blocksize, Configuration conf, CacheConfig cacheConf)
   throws IOException {
-    return createWriter(fs, dir, blocksize, null, null, conf, cacheConf,
-        BloomType.NONE, 0);
+    return createWriter(fs, dir, blocksize, null,
+        NoOpDataBlockEncoder.INSTANCE, null, conf, cacheConf, BloomType.NONE,
+        0.0f, 0, null);
   }
 
   public static StoreFile.Writer createWriter(final FileSystem fs,
@@ -582,9 +605,9 @@ public class StoreFile {
           BloomType bloomType,
           long maxKeyCount)
   throws IOException {
-      return createWriter(fs, dir, blocksize, algorithm, c, conf, cacheConf,
-          bloomType, BloomFilterFactory.getErrorRate(conf), maxKeyCount,
-          null);
+    return createWriter(fs, dir, blocksize, algorithm,
+        NoOpDataBlockEncoder.INSTANCE, c, conf, cacheConf, bloomType,
+        BloomFilterFactory.getErrorRate(conf), maxKeyCount, null);
   }
 
   /**
@@ -594,8 +617,9 @@ public class StoreFile {
    * @param dir Path to family directory.  Makes the directory if doesn't exist.
    * Creates a file with a unique name in this directory.
    * @param blocksize
-   * @param algorithm Pass null to get default.
-   * @param c Pass null to get default.
+   * @param compressAlgo Compression algorithm. Pass null to get default.
+   * @param dataBlockEncoder Pass null to disable data block encoding.
+   * @param comparator Key-value comparator. Pass null to get default.
    * @param conf HBase system configuration. used with bloom filters
    * @param cacheConf Cache configuration and reference.
    * @param bloomType column family setting for bloom filters
@@ -606,16 +630,13 @@ public class StoreFile {
    * @throws IOException
    */
   public static StoreFile.Writer createWriter(final FileSystem fs,
-                                              final Path dir,
-                                              final int blocksize,
-                                              final Compression.Algorithm algorithm,
-                                              final KeyValue.KVComparator c,
-                                              final Configuration conf,
-                                              final CacheConfig cacheConf,
-                                              BloomType bloomType,
-                                              float bloomErrorRate,
-                                              long maxKeyCount,
-                                              InetSocketAddress[] favoredNodes)
+      final Path dir, final int blocksize,
+      Compression.Algorithm compressAlgo,
+      final HFileDataBlockEncoder dataBlockEncoder,
+      KeyValue.KVComparator comparator, final Configuration conf,
+      final CacheConfig cacheConf, BloomType bloomType, float bloomErrorRate,
+      long maxKeyCount,
+      InetSocketAddress[] favoredNodes)
       throws IOException {
 
     if (!fs.exists(dir)) {
@@ -626,10 +647,15 @@ public class StoreFile {
       bloomType = BloomType.NONE;
     }
 
-    return new Writer(fs, path, blocksize,
-        algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
-        conf, cacheConf, c == null ? KeyValue.COMPARATOR: c, bloomType,
-        bloomErrorRate, maxKeyCount, favoredNodes);
+    if (compressAlgo == null) {
+      compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+    }
+    if (comparator == null) {
+      comparator = KeyValue.COMPARATOR;
+    }
+    return new Writer(fs, path, blocksize, compressAlgo, dataBlockEncoder,
+        conf, cacheConf, comparator, bloomType, bloomErrorRate,
+        maxKeyCount, favoredNodes);
   }
 
   /**
@@ -723,6 +749,8 @@ public class StoreFile {
     private KeyValue lastDeleteFamilyKV = null;
     private long deleteFamilyCnt = 0;
 
+    protected HFileDataBlockEncoder dataBlockEncoder;
+
     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
      * When flushing a memstore, we set TimeRange and use this variable to
@@ -735,13 +763,12 @@ public class StoreFile {
     protected HFile.Writer writer;
 
     public Writer(FileSystem fs, Path path, int blocksize,
-            Compression.Algorithm compress, final Configuration conf,
-            CacheConfig cacheConf,
-            final KVComparator comparator, BloomType bloomType,  long maxKeys)
-            throws IOException {
-	this(fs, path, blocksize, compress, conf, cacheConf, comparator,
-	  bloomType, BloomFilterFactory.getErrorRate(conf), maxKeys,
-        null);
+        Compression.Algorithm compress, final Configuration conf,
+        CacheConfig cacheConf, final KVComparator comparator,
+        BloomType bloomType, long maxKeys) throws IOException {
+      this(fs, path, blocksize, compress, NoOpDataBlockEncoder.INSTANCE, conf,
+          cacheConf, comparator, bloomType,
+          BloomFilterFactory.getErrorRate(conf), maxKeys, null);
     }
 
     /**
@@ -761,15 +788,18 @@ public class StoreFile {
      * @throws IOException problem writing to FS
      */
     public Writer(FileSystem fs, Path path, int blocksize,
-        Compression.Algorithm compress, final Configuration conf,
-        final CacheConfig cacheConf,
-        final KVComparator comparator, BloomType bloomType,
-        float bloomErrorRate, long maxKeys, InetSocketAddress[] favoredNodes)
+        Compression.Algorithm compress,
+        HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
+        CacheConfig cacheConf, final KVComparator comparator,
+        BloomType bloomType, float bloomErrorRate, long maxKeys,
+        InetSocketAddress[] favoredNodes)
         throws IOException {
-
+      this.dataBlockEncoder = dataBlockEncoder != null ?
+          dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
       writer = HFile.getWriterFactory(conf, cacheConf).createWriter(
           fs, path, blocksize, HFile.getBytesPerChecksum(conf, fs.getConf()),
-          compress, comparator.getRawComparator(), favoredNodes);
+          compress, this.dataBlockEncoder, comparator.getRawComparator(),
+          favoredNodes);
 
       this.kvComparator = comparator;
 
@@ -873,7 +903,8 @@ public class StoreFile {
             newKey = false;
             break;
           default:
-            throw new IOException("Invalid Bloom filter type: " + bloomType);
+            throw new IOException("Invalid Bloom filter type: " + bloomType +
+                " (ROW or ROWCOL expected)");
           }
         }
         if (newKey) {
@@ -1015,6 +1046,9 @@ public class StoreFile {
     }
 
     public void close() throws IOException {
+      // Save data block encoder metadata in the file info.
+      dataBlockEncoder.saveMetadata(this);
+
       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
 
@@ -1053,14 +1087,11 @@ public class StoreFile {
     private byte[] lastBloomKey;
     private long deleteFamilyCnt = -1;
 
-    private Reader(HFile.Reader reader) {
-      super(reader);
-      this.reader = reader;
-    }
-
-    public Reader(FileSystem fs, Path path, CacheConfig cacheConf)
-        throws IOException {
-      this(HFile.createReader(fs, path, cacheConf));
+    public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
+        DataBlockEncoding preferredEncodingInCache) throws IOException {
+      super(null, path);
+      reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
+          preferredEncodingInCache);
       bloomFilterType = BloomType.NONE;
     }
 
@@ -1562,4 +1593,4 @@ public class StoreFile {
       });
   }
 
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Fri Feb 17 01:56:33 2012
@@ -156,8 +156,8 @@ class StoreFileScanner implements KeyVal
       } finally {
         realSeekDone = true;
       }
-    } catch(IOException ioe) {
-      throw new IOException("Could not seek " + this, ioe);
+    } catch (IOException ioe) {
+      throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
   }
 
@@ -177,7 +177,8 @@ class StoreFileScanner implements KeyVal
         realSeekDone = true;
       }
     } catch (IOException ioe) {
-      throw new IOException("Could not seek " + this, ioe);
+      throw new IOException("Could not reseek " + this + " to key " + key,
+          ioe);
     }
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=1245291&r1=1245290&r2=1245291&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java Fri Feb 17 01:56:33 2012
@@ -306,7 +306,7 @@ public class RegionServerMetrics impleme
       addHLogMetric(HLog.getGSyncTime(), this.fsGroupSyncLatency);
       // HFile metrics
       int ops = HFile.getReadOps();
-      if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTime());
+      if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTimeMs());
       /* NOTE: removed HFile write latency.  2 reasons:
        * 1) Mixing HLog latencies are far higher priority since they're
        *      on-demand and HFile is used in background (compact/flush)



Mime
View raw message