hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1236031 [3/7] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...
Date Thu, 26 Jan 2012 02:59:00 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Do different kinds of data block encoding according to column family
+ * options.
+ */
+public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
+  private final DataBlockEncoding onDisk;
+  private final DataBlockEncoding inCache;
+
+  public HFileDataBlockEncoderImpl(DataBlockEncoding encoding) {
+    this(encoding, encoding);
+  }
+
+  /**
+   * Do data block encoding with specified options.
+   * @param onDisk What kind of data block encoding will be used before writing
+   *          HFileBlock to disk. This must be either the same as inCache or
+   *          {@link DataBlockEncoding#NONE}.
+   * @param inCache What kind of data block encoding will be used in block
+   *          cache.
+   */
+  public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk,
+      DataBlockEncoding inCache) {
+    this.onDisk = onDisk != null ?
+        onDisk : DataBlockEncoding.NONE;
+    this.inCache = inCache != null ?
+        inCache : DataBlockEncoding.NONE;
+    Preconditions.checkArgument(onDisk == DataBlockEncoding.NONE ||
+        onDisk == inCache, "on-disk encoding (" + onDisk + ") must be " +
+        "either the same as in-cache encoding (" + inCache + ") or " +
+        DataBlockEncoding.NONE);
+  }
+
+  public static HFileDataBlockEncoder createFromFileInfo(
+      FileInfo fileInfo, DataBlockEncoding preferredEncodingInCache)
+      throws IOException {
+    byte[] dataBlockEncodingType =
+        fileInfo.get(StoreFile.DATA_BLOCK_ENCODING);
+    if (dataBlockEncodingType == null) {
+      return NoOpDataBlockEncoder.INSTANCE;
+    }
+
+    String dataBlockEncodingStr = Bytes.toString(dataBlockEncodingType);
+    DataBlockEncoding onDisk;
+    try {
+      onDisk = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+    } catch (IllegalArgumentException ex) {
+      throw new IOException("Invalid data block encoding type in file info: " +
+          dataBlockEncodingStr, ex);
+    }
+
+    DataBlockEncoding inCache;
+    if (onDisk == DataBlockEncoding.NONE) {
+      // This is an "in-cache-only" encoding or fully-unencoded scenario.
+      // Either way, we use the given encoding (possibly NONE) specified by
+      // the column family in cache.
+      inCache = preferredEncodingInCache;
+    } else {
+      // Leave blocks in cache encoded the same way as they are on disk.
+      // If we switch encoding type for the CF or the in-cache-only encoding
+      // flag, old files will keep their encoding both on disk and in cache,
+      // but new files will be generated with the new encoding.
+      inCache = onDisk;
+    }
+    return new HFileDataBlockEncoderImpl(onDisk, inCache);
+  }
+
+  @Override
+  public void saveMetadata(StoreFile.Writer storeFileWriter)
+      throws IOException {
+    storeFileWriter.appendFileInfo(StoreFile.DATA_BLOCK_ENCODING,
+        onDisk.getNameInBytes());
+  }
+
+  @Override
+  public DataBlockEncoding getEncodingOnDisk() {
+    return onDisk;
+  }
+
+  @Override
+  public DataBlockEncoding getEncodingInCache() {
+    return inCache;
+  }
+
+  @Override
+  public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
+    if (!useEncodedScanner(isCompaction)) {
+      return DataBlockEncoding.NONE;
+    }
+    return inCache;
+  }
+
+  @Override
+  public HFileBlock diskToCacheFormat(HFileBlock block, boolean isCompaction) {
+    if (block.getBlockType() == BlockType.DATA) {
+      if (!useEncodedScanner(isCompaction)) {
+        // Unencoded block, and we don't want to encode in cache.
+        return block;
+      }
+      // Encode the unencoded block with the in-cache encoding.
+      return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS());
+    }
+
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      if (block.getDataBlockEncodingId() == onDisk.getId()) {
+        // The block is already in the desired in-cache encoding.
+        return block;
+      }
+      // We don't want to re-encode a block in a different encoding. The HFile
+      // reader should have been instantiated in such a way that we would not
+      // have to do this.
+      throw new AssertionError("Expected on-disk data block encoding " +
+          onDisk + ", got " + block.getDataBlockEncoding());
+    }
+    return block;
+  }
+
+  /**
+   * Precondition: a non-encoded buffer.
+   * Postcondition: on-disk encoding.
+   */
+  @Override
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in,
+      boolean includesMemstoreTS) {
+    if (onDisk == DataBlockEncoding.NONE) {
+      // there is no need to encode the block before writing it to disk
+      return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+    }
+
+    ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in,
+        onDisk, includesMemstoreTS);
+    return new Pair<ByteBuffer, BlockType>(encodedBuffer,
+        BlockType.ENCODED_DATA);
+  }
+
+  @Override
+  public boolean useEncodedScanner(boolean isCompaction) {
+    if (isCompaction && onDisk == DataBlockEncoding.NONE) {
+      return false;
+    }
+    return inCache != DataBlockEncoding.NONE;
+  }
+
+  private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in,
+      DataBlockEncoding algo, boolean includesMemstoreTS) {
+    ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(encodedStream);
+    DataBlockEncoder encoder = algo.getEncoder();
+    try {
+      encodedStream.write(HFileBlock.DUMMY_HEADER);
+      algo.writeIdInBytes(dataOut);
+      encoder.compressKeyValues(dataOut, in,
+          includesMemstoreTS);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format("Bug in data block encoder " +
+          "'%s', it probably requested too much data", algo.toString()), e);
+    }
+    return ByteBuffer.wrap(encodedStream.toByteArray());
+  }
+
+  private HFileBlock encodeDataBlock(HFileBlock block,
+      DataBlockEncoding algo, boolean includesMemstoreTS) {
+    ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer(
+        block.getBufferWithoutHeader(), algo, includesMemstoreTS);
+    int sizeWithoutHeader = compressedBuffer.limit() - HFileBlock.HEADER_SIZE;
+    HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA,
+        block.getOnDiskSizeWithoutHeader(),
+        sizeWithoutHeader, block.getPrevBlockOffset(),
+        compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(),
+        includesMemstoreTS);
+    block.passSchemaMetricsTo(encodedBlock);
+    return encodedBlock;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(onDisk=" + onDisk + ", inCache=" +
+        inCache + ")";
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Thu Jan 26 02:58:57 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.ByteBloomFilter;
@@ -170,6 +171,7 @@ public class HFilePrettyPrinter {
         conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
     conf.set("fs.default.name",
         conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+    SchemaMetrics.configureGlobally(conf);
     try {
       if (!parseOptions(args))
         return 1;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Thu Jan 26 02:58:57 2012
@@ -30,9 +30,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -42,7 +42,9 @@ import org.apache.hadoop.io.RawComparato
 import com.google.common.base.Preconditions;
 
 /**
- * {@link HFile} reader for version 1.
+ * {@link HFile} reader for version 1. Does not support data block encoding,
+ * even in cache only, i.e. HFile v1 blocks are always brought into cache
+ * unencoded.
  */
 public class HFileReaderV1 extends AbstractHFileReader {
   private static final Log LOG = LogFactory.getLog(HFileReaderV1.class);
@@ -212,7 +214,8 @@ public class HFileReaderV1 extends Abstr
 
     long startTimeNs = System.nanoTime();
 
-    BlockCacheKey cacheKey = HFile.getBlockCacheKey(name, offset);
+    BlockCacheKey cacheKey = new BlockCacheKey(name, offset,
+        DataBlockEncoding.NONE, BlockType.META);
 
     BlockCategory effectiveCategory = BlockCategory.META;
     if (metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_META_KEY) ||
@@ -280,7 +283,7 @@ public class HFileReaderV1 extends Abstr
     }
 
     long offset = dataBlockIndexReader.getRootBlockOffset(block);
-    BlockCacheKey cacheKey = HFile.getBlockCacheKey(name, offset);
+    BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
 
     // For any given block from any given file, synchronize reads for said
     // block.
@@ -297,8 +300,8 @@ public class HFileReaderV1 extends Abstr
               cacheConf.shouldCacheDataOnRead());
         if (cachedBlock != null) {
           cacheHits.incrementAndGet();
-          getSchemaMetrics().updateOnCacheHit(cachedBlock.getBlockType().getCategory(),
-              isCompaction);
+          getSchemaMetrics().updateOnCacheHit(
+              cachedBlock.getBlockType().getCategory(), isCompaction);
           return cachedBlock.getBufferWithoutHeader();
         }
         // Carry on, please load.
@@ -322,7 +325,6 @@ public class HFileReaderV1 extends Abstr
           - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
       passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.DATA);
-      ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
 
       long delta = System.nanoTime() - startTimeNs;
       if (pread) {
@@ -341,8 +343,7 @@ public class HFileReaderV1 extends Abstr
         cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
             cacheConf.isInMemory());
       }
-
-      return buf;
+      return hfileBlock.getBufferWithoutHeader();
     }
   }
 
@@ -382,9 +383,12 @@ public class HFileReaderV1 extends Abstr
     if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
       int numEvicted = 0;
       for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
-        if (cacheConf.getBlockCache().evictBlock(HFile.getBlockCacheKey(name,
-            dataBlockIndexReader.getRootBlockOffset(i))))
+        if (cacheConf.getBlockCache().evictBlock(
+            new BlockCacheKey(name,
+                dataBlockIndexReader.getRootBlockOffset(i),
+                DataBlockEncoding.NONE, BlockType.DATA))) {
           numEvicted++;
+        }
       }
       LOG.debug("On close of file " + name + " evicted " + numEvicted
           + " block(s) of " + dataBlockIndexReader.getRootBlockCount()
@@ -396,16 +400,106 @@ public class HFileReaderV1 extends Abstr
     }
   }
 
+  protected abstract static class AbstractScannerV1
+      extends AbstractHFileReader.Scanner {
+    protected int currBlock;
+
+    /**
+     * This masks a field with the same name in the superclass and saves us the
+     * runtime overhead of casting from abstract reader to reader V1.
+     */
+    protected HFileReaderV1 reader;
+
+    public AbstractScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(reader, cacheBlocks, pread, isCompaction);
+      this.reader = (HFileReaderV1) reader;
+    }
+
+    /**
+     * Within a loaded block, seek looking for the first key
+     * that is smaller than (or equal to?) the key we are interested in.
+     *
+     * A note on the seekBefore - if you have seekBefore = true, AND the
+     * first key in the block = key, then you'll get thrown exceptions.
+     * @param key to find
+     * @param seekBefore find the key before the exact match.
+     * @return
+     */
+    protected abstract int blockSeek(byte[] key, int offset, int length,
+        boolean seekBefore);
+
+    protected abstract void loadBlock(int bloc, boolean rewind)
+        throws IOException;
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      int b = reader.blockContainingKey(key, offset, length);
+      if (b < 0) return -1; // falls before the beginning of the file! :-(
+      // Avoid re-reading the same block (that'd be dumb).
+      loadBlock(b, true);
+      return blockSeek(key, offset, length, false);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length)
+        throws IOException {
+      if (blockBuffer != null && currKeyLen != 0) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+
+      int b = reader.blockContainingKey(key, offset, length);
+      if (b < 0) {
+        return -1;
+      }
+      loadBlock(b, false);
+      return blockSeek(key, offset, length, false);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+        throws IOException {
+      int b = reader.blockContainingKey(key, offset, length);
+      if (b < 0)
+        return false; // key is before the start of the file.
+
+      // Question: does this block begin with 'key'?
+      byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
+      if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
+          key, offset, length) == 0) {
+        // Ok the key we're interested in is the first of the block, so go back
+        // by one.
+        if (b == 0) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+        b--;
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlock(b, true);
+      blockSeek(key, offset, length, true);
+      return true;
+    }
+  }
+
   /**
    * Implementation of {@link HFileScanner} interface.
    */
-  protected static class ScannerV1 extends AbstractHFileReader.Scanner {
-    private final HFileReaderV1 reader;
-    private int currBlock;
+
+  protected static class ScannerV1 extends AbstractScannerV1 {
+    private HFileReaderV1 reader;
 
     public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
-      super(cacheBlocks, pread, isCompaction);
+      super(reader, cacheBlocks, pread, isCompaction);
       this.reader = reader;
     }
 
@@ -486,57 +580,7 @@ public class HFileReaderV1 extends Abstr
     }
 
     @Override
-    public int seekTo(byte[] key) throws IOException {
-      return seekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int seekTo(byte[] key, int offset, int length) throws IOException {
-      int b = reader.blockContainingKey(key, offset, length);
-      if (b < 0) return -1; // falls before the beginning of the file! :-(
-      // Avoid re-reading the same block (that'd be dumb).
-      loadBlock(b, true);
-      return blockSeek(key, offset, length, false);
-    }
-
-    @Override
-    public int reseekTo(byte[] key) throws IOException {
-      return reseekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int reseekTo(byte[] key, int offset, int length)
-        throws IOException {
-      if (blockBuffer != null && currKeyLen != 0) {
-        ByteBuffer bb = getKey();
-        int compared = reader.getComparator().compare(key, offset,
-            length, bb.array(), bb.arrayOffset(), bb.limit());
-        if (compared <= 0) {
-          // If the required key is less than or equal to current key, then
-          // don't do anything.
-          return compared;
-        }
-      }
-
-      int b = reader.blockContainingKey(key, offset, length);
-      if (b < 0) {
-        return -1;
-      }
-      loadBlock(b, false);
-      return blockSeek(key, offset, length, false);
-    }
-
-    /**
-     * Within a loaded block, seek looking for the first key
-     * that is smaller than (or equal to?) the key we are interested in.
-     *
-     * A note on the seekBefore - if you have seekBefore = true, AND the
-     * first key in the block = key, then you'll get thrown exceptions.
-     * @param key to find
-     * @param seekBefore find the key before the exact match.
-     * @return
-     */
-    private int blockSeek(byte[] key, int offset, int length,
+    protected int blockSeek(byte[] key, int offset, int length,
         boolean seekBefore) {
       int klen, vlen;
       int lastLen = 0;
@@ -578,37 +622,6 @@ public class HFileReaderV1 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key) throws IOException {
-      return seekBefore(key, 0, key.length);
-    }
-
-    @Override
-    public boolean seekBefore(byte[] key, int offset, int length)
-    throws IOException {
-      int b = reader.blockContainingKey(key, offset, length);
-      if (b < 0)
-        return false; // key is before the start of the file.
-
-      // Question: does this block begin with 'key'?
-      byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
-      if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
-          key, offset, length) == 0) {
-        // Ok the key we're interested in is the first of the block, so go back
-        // by one.
-        if (b == 0) {
-          // we have a 'problem', the key we want is the first of the file.
-          return false;
-        }
-        b--;
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
-      }
-      loadBlock(b, true);
-      blockSeek(key, offset, length, true);
-      return true;
-    }
-
-    @Override
     public String getKeyString() {
       return Bytes.toStringBinary(blockBuffer.array(),
           blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
@@ -621,11 +634,6 @@ public class HFileReaderV1 extends Abstr
     }
 
     @Override
-    public Reader getReader() {
-      return reader;
-    }
-
-    @Override
     public boolean seekTo() throws IOException {
       if (reader.getDataBlockIndexReader().isEmpty()) {
         return false;
@@ -645,7 +653,8 @@ public class HFileReaderV1 extends Abstr
       return true;
     }
 
-    private void loadBlock(int bloc, boolean rewind) throws IOException {
+    @Override
+    protected void loadBlock(int bloc, boolean rewind) throws IOException {
       if (blockBuffer == null) {
         blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
             isCompaction);
@@ -674,7 +683,8 @@ public class HFileReaderV1 extends Abstr
 
   @Override
   public HFileBlock readBlock(long offset, long onDiskBlockSize,
-      boolean cacheBlock, boolean pread, boolean isCompaction) {
+      boolean cacheBlock, boolean pread, boolean isCompaction,
+      BlockType expectedBlockType) {
     throw new UnsupportedOperationException();
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Thu Jan 26 02:58:57 2012
@@ -30,6 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -80,17 +82,20 @@ public class HFileReaderV2 extends Abstr
    * @param size Length of the stream.
    * @param closeIStream Whether to close the stream.
    * @param cacheConf Cache configuration.
-   * @throws IOException
+   * @param preferredEncodingInCache the encoding to use in cache in case we
+   *          have a choice. If the file is already encoded on disk, we will
+   *          still use its on-disk encoding in cache.
    */
   public HFileReaderV2(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long size,
-      final boolean closeIStream, final CacheConfig cacheConf)
-  throws IOException {
+      final boolean closeIStream, final CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache)
+      throws IOException {
     super(path, trailer, fsdis, size, closeIStream, cacheConf);
-
     trailer.expectVersion(2);
-    fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
-        fileSize);
+    HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
+        compressAlgo, fileSize);
+    this.fsBlockReader = fsBlockReaderV2; // upcast
 
     // Comparator class name is stored in the trailer in version 2.
     comparator = trailer.createComparator();
@@ -101,7 +106,7 @@ public class HFileReaderV2 extends Abstr
 
     // Parse load-on-open data.
 
-    HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
+    HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
         trailer.getLoadOnOpenDataOffset(),
         fileSize - trailer.getTrailerSize());
 
@@ -122,9 +127,17 @@ public class HFileReaderV2 extends Abstr
     lastKey = fileInfo.get(FileInfo.LASTKEY);
     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
-    includesMemstoreTS = (keyValueFormatVersion != null &&
-        Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE);
+    byte [] keyValueFormatVersion =
+        fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
+    includesMemstoreTS = keyValueFormatVersion != null &&
+        Bytes.toInt(keyValueFormatVersion) ==
+            HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
+    fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
+
+    // Read data block encoding algorithm name from file info.
+    dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo,
+        preferredEncodingInCache);
+    fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
 
     // Store all other load-on-open blocks for further consumption.
     HFileBlock b;
@@ -145,9 +158,15 @@ public class HFileReaderV2 extends Abstr
    * @param isCompaction is scanner being used for a compaction?
    * @return Scanner on this file.
    */
-  @Override
-  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+   @Override
+   public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
       final boolean isCompaction) {
+    // check if we want to use data block encoding in memory
+    if (dataBlockEncoder.useEncodedScanner(isCompaction)) {
+      return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
+          includesMemstoreTS);
+    }
+
     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
   }
 
@@ -183,7 +202,8 @@ public class HFileReaderV2 extends Abstr
 
       // Check cache for block. If found return.
       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
-      BlockCacheKey cacheKey = HFile.getBlockCacheKey(name, metaBlockOffset);
+      BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
+          DataBlockEncoding.NONE, BlockType.META);
 
       cacheBlock &= cacheConf.shouldCacheDataOnRead();
       if (cacheConf.isBlockCacheEnabled()) {
@@ -220,19 +240,23 @@ public class HFileReaderV2 extends Abstr
 
   /**
    * Read in a file block.
-   *
    * @param dataBlockOffset offset to read.
    * @param onDiskBlockSize size of the block
    * @param cacheBlock
-   * @param pread Use positional read instead of seek+read (positional is better
-   *          doing random reads whereas seek+read is better scanning).
+   * @param pread Use positional read instead of seek+read (positional is
+   *          better doing random reads whereas seek+read is better scanning).
    * @param isCompaction is this block being read as part of a compaction
+   * @param expectedBlockType the block type we are expecting to read with this
+   *          read operation, or null to read whatever block type is available
+   *          and avoid checking (that might reduce caching efficiency of
+   *          encoded data blocks)
    * @return Block wrapped in a ByteBuffer.
    * @throws IOException
    */
   @Override
   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
-      final boolean cacheBlock, boolean pread, final boolean isCompaction)
+      final boolean cacheBlock, boolean pread, final boolean isCompaction,
+      BlockType expectedBlockType)
       throws IOException {
     if (dataBlockIndexReader == null) {
       throw new IOException("Block index not loaded");
@@ -249,15 +273,18 @@ public class HFileReaderV2 extends Abstr
     // the other choice is to duplicate work (which the cache would prevent you
     // from doing).
 
-    BlockCacheKey cacheKey = HFile.getBlockCacheKey(name, dataBlockOffset);
+    BlockCacheKey cacheKey =
+        new BlockCacheKey(name, dataBlockOffset,
+            dataBlockEncoder.getEffectiveEncodingInCache(isCompaction),
+            expectedBlockType);
     IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
     try {
       blockLoads.incrementAndGet();
 
       // Check cache for block. If found return.
       if (cacheConf.isBlockCacheEnabled()) {
-        HFileBlock cachedBlock =
-          (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
+        HFileBlock cachedBlock = (HFileBlock)
+            cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
         if (cachedBlock != null) {
           BlockCategory blockCategory =
               cachedBlock.getBlockType().getCategory();
@@ -265,8 +292,21 @@ public class HFileReaderV2 extends Abstr
 
           getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
 
-          if (cachedBlock.getBlockType() == BlockType.DATA)
+          if (cachedBlock.getBlockType() == BlockType.DATA) {
             HFile.dataBlockReadCnt.incrementAndGet();
+          }
+
+          validateBlockType(cachedBlock, expectedBlockType);
+
+          // Validate encoding type for encoded blocks. We include encoding
+          // type in the cache key, and we expect it to match on a cache hit.
+          if (cachedBlock.getBlockType() == BlockType.ENCODED_DATA &&
+              cachedBlock.getDataBlockEncoding() !=
+              dataBlockEncoder.getEncodingInCache()) {
+            throw new IOException("Cached block under key " + cacheKey + " " +
+                "has wrong encoding: " + cachedBlock.getDataBlockEncoding() +
+                " (expected: " + dataBlockEncoder.getEncodingInCache() + ")");
+          }
           return cachedBlock;
         }
         // Carry on, please load.
@@ -276,6 +316,9 @@ public class HFileReaderV2 extends Abstr
       long startTimeNs = System.nanoTime();
       HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
           onDiskBlockSize, -1, pread);
+      hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock,
+          isCompaction);
+      validateBlockType(hfileBlock, expectedBlockType);
       passSchemaMetricsTo(hfileBlock);
       BlockCategory blockCategory = hfileBlock.getBlockType().getCategory();
 
@@ -307,6 +350,33 @@ public class HFileReaderV2 extends Abstr
   }
 
   /**
+   * Compares the actual type of a block retrieved from cache or disk with its
+   * expected type and throws an exception in case of a mismatch. Expected
+   * block type of {@link BlockType#DATA} is considered to match the actual
+   * block type [@link {@link BlockType#ENCODED_DATA} as well.
+   * @param block a block retrieved from cache or disk
+   * @param expectedBlockType the expected block type, or null to skip the
+   *          check
+   */
+  private void validateBlockType(HFileBlock block,
+      BlockType expectedBlockType) throws IOException {
+    if (expectedBlockType == null) {
+      return;
+    }
+    BlockType actualBlockType = block.getBlockType();
+    if (actualBlockType == BlockType.ENCODED_DATA &&
+        expectedBlockType == BlockType.DATA) {
+      // We consider DATA to match ENCODED_DATA for the purpose of this
+      // verification.
+      return;
+    }
+    if (actualBlockType != expectedBlockType) {
+      throw new IOException("Expected block type " + expectedBlockType + ", " +
+          "but got " + actualBlockType + ": " + block);
+    }
+  }
+
+  /**
    * @return Last key in the file. May be null if file has no entries. Note that
    *         this is not the last row key, but rather the byte form of the last
    *         KeyValue.
@@ -345,31 +415,163 @@ public class HFileReaderV2 extends Abstr
     }
   }
 
+  protected abstract static class AbstractScannerV2
+      extends AbstractHFileReader.Scanner {
+    protected HFileBlock block;
+
+    public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(r, cacheBlocks, pread, isCompaction);
+    }
+
+    /**
+     * An internal API function. Seek to the given key, optionally rewinding to
+     * the first key of the block before doing the seek.
+     *
+     * @param key key byte array
+     * @param offset key offset in the key byte array
+     * @param length key length
+     * @param rewind whether to rewind to the first key of the block before
+     *        doing the seek. If this is false, we are assuming we never go
+     *        back, otherwise the result is undefined.
+     * @return -1 if the key is earlier than the first key of the file,
+     *         0 if we are at the given key, and 1 if we are past the given key
+     * @throws IOException
+     */
+    protected int seekTo(byte[] key, int offset, int length, boolean rewind)
+        throws IOException {
+      HFileBlockIndex.BlockIndexReader indexReader =
+          reader.getDataBlockIndexReader();
+      HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
+          block, cacheBlocks, pread, isCompaction);
+      if (seekToBlock == null) {
+        // This happens if the key e.g. falls before the beginning of the file.
+        return -1;
+      }
+      return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
+          false);
+    }
+
+    protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
+
+    protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock,
+        boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException;
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      // Always rewind to the first key of the block, because the given key
+      // might be before or after the current key.
+      return seekTo(key, offset, length, true);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length) throws IOException {
+      if (isSeeked()) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+
+      // Don't rewind on a reseek operation, because reseek implies that we are
+      // always going forward in the file.
+      return seekTo(key, offset, length, false);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+        throws IOException {
+      HFileBlock seekToBlock =
+          reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
+              block, cacheBlocks, pread, isCompaction);
+      if (seekToBlock == null) {
+        return false;
+      }
+      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+      if (reader.getComparator().compare(firstKey.array(),
+          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
+      {
+        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+        // The key we are interested in
+        if (previousBlockOffset == -1) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+
+        // It is important that we compute and pass onDiskSize to the block
+        // reader so that it does not have to read the header separately to
+        // figure out the size.
+        seekToBlock = reader.readBlock(previousBlockOffset,
+            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
+            pread, isCompaction, BlockType.DATA);
+
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+      return true;
+    }
+
+
+    /**
+     * Scans blocks in the "scanned" section of the {@link HFile} until the next
+     * data block is found.
+     *
+     * @return the next block, or null if there are no more data blocks
+     * @throws IOException
+     */
+    protected HFileBlock readNextDataBlock() throws IOException {
+      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+      if (block == null)
+        return null;
+
+      HFileBlock curBlock = block;
+
+      do {
+        if (curBlock.getOffset() >= lastDataBlockOffset)
+          return null;
+
+        if (curBlock.getOffset() < 0) {
+          throw new IOException("Invalid block file offset: " + block);
+        }
+
+        // We are reading the next block without block type validation, because
+        // it might turn out to be a non-data block.
+        curBlock = reader.readBlock(curBlock.getOffset()
+            + curBlock.getOnDiskSizeWithHeader(),
+            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+            isCompaction, null);
+      } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
+          curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
+
+      return curBlock;
+    }
+  }
+
   /**
    * Implementation of {@link HFileScanner} interface.
    */
-  protected static class ScannerV2 extends AbstractHFileReader.Scanner {
-    private HFileBlock block;
+  protected static class ScannerV2 extends AbstractScannerV2 {
     private HFileReaderV2 reader;
 
     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
-      super(cacheBlocks, pread, isCompaction);
+      super(r, cacheBlocks, pread, isCompaction);
       this.reader = r;
     }
 
     @Override
-    public HFileReaderV2 getReader() {
-      return reader;
-    }
-
-    @Override
     public KeyValue getKeyValue() {
       if (!isSeeked())
         return null;
 
-      KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position());
+      KeyValue ret = new KeyValue(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position());
       if (this.reader.shouldIncludeMemstoreTS()) {
         ret.setMemstoreTS(currMemstoreTS);
       }
@@ -452,36 +654,6 @@ public class HFileReaderV2 extends Abstr
     }
 
     /**
-     * Scans blocks in the "scanned" section of the {@link HFile} until the next
-     * data block is found.
-     *
-     * @return the next block, or null if there are no more data blocks
-     * @throws IOException
-     */
-    private HFileBlock readNextDataBlock() throws IOException {
-      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
-      if (block == null)
-        return null;
-
-      HFileBlock curBlock = block;
-
-      do {
-        if (curBlock.getOffset() >= lastDataBlockOffset)
-          return null;
-
-        if (curBlock.getOffset() < 0) {
-          throw new IOException("Invalid block file offset: " + block);
-        }
-        curBlock = reader.readBlock(curBlock.getOffset()
-            + curBlock.getOnDiskSizeWithHeader(),
-            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
-            isCompaction);
-      } while (!curBlock.getBlockType().equals(BlockType.DATA));
-
-      return curBlock;
-    }
-
-    /**
      * Positions this scanner at the start of the file.
      *
      * @return false if empty file; i.e. a call to next would return false and
@@ -508,7 +680,7 @@ public class HFileReaderV2 extends Abstr
       }
 
       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
-          isCompaction);
+          isCompaction, BlockType.DATA);
       if (block.getOffset() < 0) {
         throw new IOException("Invalid block offset: " + block.getOffset());
       }
@@ -517,70 +689,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public int seekTo(byte[] key) throws IOException {
-      return seekTo(key, 0, key.length);
-    }
-
-    /**
-     * An internal API function. Seek to the given key, optionally rewinding to
-     * the first key of the block before doing the seek.
-     *
-     * @param key key byte array
-     * @param offset key offset in the key byte array
-     * @param length key length
-     * @param rewind whether to rewind to the first key of the block before
-     *        doing the seek. If this is false, we are assuming we never go
-     *        back, otherwise the result is undefined.
-     * @return -1 if the key is earlier than the first key of the file,
-     *         0 if we are at the given key, and 1 if we are past the given key
-     * @throws IOException
-     */
-    private int seekTo(byte[] key, int offset, int length, boolean rewind)
-        throws IOException {
-      HFileBlockIndex.BlockIndexReader indexReader =
-          reader.getDataBlockIndexReader();
-      HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
-          block, cacheBlocks, pread, isCompaction);
-
-      if (seekToBlock == null) {
-        // This happens if the key e.g. falls before the beginning of the file.
-        return -1;
-      }
-      return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
-          false);
-    }
-
-    @Override
-    public int seekTo(byte[] key, int offset, int length) throws IOException {
-      // Always rewind to the first key of the block, because the given key
-      // might be before or after the current key.
-      return seekTo(key, offset, length, true);
-    }
-
-    @Override
-    public int reseekTo(byte[] key) throws IOException {
-      return reseekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int reseekTo(byte[] key, int offset, int length) throws IOException {
-      if (isSeeked()) {
-        ByteBuffer bb = getKey();
-        int compared = reader.getComparator().compare(key, offset,
-            length, bb.array(), bb.arrayOffset(), bb.limit());
-        if (compared < 1) {
-          // If the required key is less than or equal to current key, then
-          // don't do anything.
-          return compared;
-        }
-      }
-
-      // Don't rewind on a reseek operation, because reseek implies that we are
-      // always going forward in the file.
-      return seekTo(key, offset, length, false);
-    }
-
-    private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
         byte[] key, int offset, int length, boolean seekBefore)
         throws IOException {
       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
@@ -599,6 +708,16 @@ public class HFileReaderV2 extends Abstr
      */
     private void updateCurrBlock(HFileBlock newBlock) {
       block = newBlock;
+
+      // sanity check
+      if (block.getBlockType() != BlockType.DATA) {
+        throw new IllegalStateException("ScannerV2 works only on data " +
+            "blocks, got " + block.getBlockType() + "; " +
+            "fileName=" + reader.name + ", " +
+            "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
+            "isCompaction=" + isCompaction);
+      }
+
       blockBuffer = block.getBufferWithoutHeader();
       readKeyValueLen();
       blockFetches++;
@@ -611,12 +730,14 @@ public class HFileReaderV2 extends Abstr
       blockBuffer.reset();
       if (this.reader.shouldIncludeMemstoreTS()) {
         try {
-          int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
-                                  + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
-          currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+          int memstoreTSOffset = blockBuffer.arrayOffset()
+              + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen
+              + currValueLen;
+          currMemstoreTS = Bytes.readVLong(blockBuffer.array(),
+              memstoreTSOffset);
           currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
         } catch (Exception e) {
-          throw new RuntimeException("Error reading memstoreTS. " + e);
+          throw new RuntimeException("Error reading memstore timestamp", e);
         }
       }
 
@@ -631,7 +752,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     /**
-     * Within a loaded block, seek looking for the first key that is smaller
+     * Within a loaded block, seek looking for the last key that is smaller
      * than (or equal to?) the key we are interested in.
      *
      * A note on the seekBefore: if you have seekBefore = true, AND the first
@@ -656,12 +777,13 @@ public class HFileReaderV2 extends Abstr
         blockBuffer.reset();
         if (this.reader.shouldIncludeMemstoreTS()) {
           try {
-            int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
-                                  + KEY_VALUE_LEN_SIZE + klen + vlen;
-            memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+            int memstoreTSOffset = blockBuffer.arrayOffset()
+                + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
+            memstoreTS = Bytes.readVLong(blockBuffer.array(),
+                memstoreTSOffset);
             memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
           } catch (Exception e) {
-            throw new RuntimeException("Error reading memstoreTS. " + e);
+            throw new RuntimeException("Error reading memstore timestamp", e);
           }
         }
 
@@ -713,11 +835,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key) throws IOException {
-      return seekBefore(key, 0, key.length);
-    }
-
-    private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
       // It is safe to manipulate this buffer because we own the buffer object.
       buffer.rewind();
@@ -730,53 +848,174 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key, int offset, int length)
-        throws IOException {
-      HFileBlock seekToBlock =
-          reader.getDataBlockIndexReader().seekToDataBlock(key, offset,
-              length, block, cacheBlocks, pread, isCompaction);
-      if (seekToBlock == null) {
+    public String getKeyString() {
+      return Bytes.toStringBinary(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen);
+    }
+
+    @Override
+    public String getValueString() {
+      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+          currValueLen);
+    }
+  }
+
+  /**
+   * ScannerV2 that operates on encoded data blocks.
+   */
+  protected static class EncodedScannerV2 extends AbstractScannerV2 {
+    private DataBlockEncoder.EncodedSeeker seeker = null;
+    private DataBlockEncoder dataBlockEncoder = null;
+    private final boolean includesMemstoreTS;
+
+    public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
+        boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
+      super(reader, cacheBlocks, pread, isCompaction);
+      this.includesMemstoreTS = includesMemstoreTS;
+    }
+
+    private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
+      this.dataBlockEncoder = dataBlockEncoder;
+      seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
+          includesMemstoreTS);
+    }
+
+    /**
+     * Updates the current block to be the given {@link HFileBlock}. Seeks to
+     * the the first key/value pair.
+     *
+     * @param newBlock the block to make current
+     */
+    private void updateCurrentBlock(HFileBlock newBlock) {
+      block = newBlock;
+
+      // sanity checks
+      if (block.getBlockType() != BlockType.ENCODED_DATA) {
+        throw new IllegalStateException(
+            "EncodedScannerV2 works only on encoded data blocks");
+      }
+
+      short dataBlockEncoderId = block.getDataBlockEncodingId();
+      if (dataBlockEncoder == null ||
+          !DataBlockEncoding.isCorrectEncoder(dataBlockEncoder,
+              dataBlockEncoderId)) {
+        DataBlockEncoder encoder =
+            DataBlockEncoding.getDataBlockEncoderById(dataBlockEncoderId);
+        setDataBlockEncoder(encoder);
+      }
+
+      seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
+      blockFetches++;
+    }
+
+    private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
+      ByteBuffer origBlock = newBlock.getBufferReadOnly();
+      ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
+          origBlock.arrayOffset() + HFileBlock.HEADER_SIZE +
+          DataBlockEncoding.ID_SIZE,
+          origBlock.limit() - HFileBlock.HEADER_SIZE -
+          DataBlockEncoding.ID_SIZE).slice();
+      return encodedBlock;
+    }
+
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader == null) {
         return false;
       }
-      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
-      if (reader.getComparator().compare(firstKey.array(),
-          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
-      {
-        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
-        // The key we are interested in
-        if (previousBlockOffset == -1) {
-          // we have a 'problem', the key we want is the first of the file.
-          return false;
-        }
 
-        // It is important that we compute and pass onDiskSize to the block
-        // reader so that it does not have to read the header separately to
-        // figure out the size.
-        seekToBlock = reader.readBlock(previousBlockOffset,
-            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
-            pread, isCompaction);
+      if (reader.getTrailer().getEntryCount() == 0) {
+        // No data blocks.
+        return false;
+      }
 
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
+      long firstDataBlockOffset =
+          reader.getTrailer().getFirstDataBlockOffset();
+      if (block != null && block.getOffset() == firstDataBlockOffset) {
+        seeker.rewind();
+        return true;
       }
-      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+
+      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+          isCompaction, BlockType.DATA);
+      if (block.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + block.getOffset());
+      }
+      updateCurrentBlock(block);
       return true;
     }
 
     @Override
+    public boolean next() throws IOException {
+      boolean isValid = seeker.next();
+      if (!isValid) {
+        block = readNextDataBlock();
+        isValid = block != null;
+        if (isValid) {
+          updateCurrentBlock(block);
+        }
+      }
+      return isValid;
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      assertValidSeek();
+      return seeker.getKeyDeepCopy();
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      assertValidSeek();
+      return seeker.getValueShallowCopy();
+    }
+
+    @Override
+    public KeyValue getKeyValue() {
+      if (block == null) {
+        return null;
+      }
+      return seeker.getKeyValue();
+    }
+
+    @Override
     public String getKeyString() {
-      return Bytes.toStringBinary(blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position()
-              + KEY_VALUE_LEN_SIZE, currKeyLen);
+      ByteBuffer keyBuffer = getKey();
+      return Bytes.toStringBinary(keyBuffer.array(),
+          keyBuffer.arrayOffset(), keyBuffer.limit());
     }
 
     @Override
     public String getValueString() {
-      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
-          currValueLen);
+      ByteBuffer valueBuffer = getValue();
+      return Bytes.toStringBinary(valueBuffer.array(),
+          valueBuffer.arrayOffset(), valueBuffer.limit());
     }
 
+    private void assertValidSeek() {
+      if (block == null) {
+        throw new NotSeekedException();
+      }
+    }
+
+    @Override
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+      return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
+    }
+
+    @Override
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+        byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException  {
+      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+        updateCurrentBlock(seekToBlock);
+      } else if (rewind) {
+        seeker.rewind();
+      }
+      return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
+    }
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Thu Jan 26 02:58:57 2012
@@ -35,8 +35,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +47,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.Compressor;
 
 /**
- * Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
+ * Writes version 1 HFiles. Mainly used for testing backwards-compatibility.
  */
 public class HFileWriterV1 extends AbstractHFileWriter {
 
@@ -91,16 +94,17 @@ public class HFileWriterV1 extends Abstr
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        Compression.Algorithm compressAlgo, final KeyComparator comparator)
+        Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
+        KeyComparator comparator)
         throws IOException {
       return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
-          compressAlgo, comparator);
+          compressAlgo, dataBlockEncoder, comparator);
     }
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        String compressAlgoName,
-        final KeyComparator comparator) throws IOException {
+        String compressAlgoName, KeyComparator comparator)
+        throws IOException {
       return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
           compressAlgoName, comparator);
     }
@@ -117,7 +121,8 @@ public class HFileWriterV1 extends Abstr
     public Writer createWriter(final FSDataOutputStream ostream,
         final int blockSize, final Compression.Algorithm compress,
         final KeyComparator c) throws IOException {
-      return new HFileWriterV1(cacheConf, ostream, blockSize, compress, c);
+      return new HFileWriterV1(cacheConf, ostream, blockSize, compress,
+          NoOpDataBlockEncoder.INSTANCE, c);
     }
   }
 
@@ -127,7 +132,7 @@ public class HFileWriterV1 extends Abstr
       throws IOException {
     this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
         HFile.DEFAULT_COMPRESSION_ALGORITHM,
-        null);
+        NoOpDataBlockEncoder.INSTANCE, null);
   }
 
   /**
@@ -138,15 +143,18 @@ public class HFileWriterV1 extends Abstr
       Path path, int blockSize, String compressAlgoName,
       final KeyComparator comparator) throws IOException {
     this(conf, cacheConf, fs, path, blockSize,
-        compressionByName(compressAlgoName), comparator);
+        compressionByName(compressAlgoName), NoOpDataBlockEncoder.INSTANCE,
+        comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
-  public HFileWriterV1(Configuration conf, CacheConfig cacheConf, FileSystem fs,
-      Path path, int blockSize, Compression.Algorithm compress,
+  public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
+      FileSystem fs, Path path,
+      int blockSize, Compression.Algorithm compress,
+      HFileDataBlockEncoder blockEncoder,
       final KeyComparator comparator) throws IOException {
     super(cacheConf, createOutputStream(conf, fs, path), path,
-        blockSize, compress, comparator);
+        blockSize, compress, blockEncoder, comparator);
     SchemaMetrics.configureGlobally(conf);
   }
 
@@ -157,15 +165,17 @@ public class HFileWriterV1 extends Abstr
       throws IOException {
     this(cacheConf, outputStream, blockSize,
         Compression.getCompressionAlgorithmByName(compressAlgoName),
-        comparator);
+        NoOpDataBlockEncoder.INSTANCE, comparator);
   }
 
   /** Constructor that takes a stream. */
   public HFileWriterV1(CacheConfig cacheConf,
       final FSDataOutputStream outputStream, final int blockSize,
-      final Compression.Algorithm compress, final KeyComparator comparator)
+      final Compression.Algorithm compress,
+      HFileDataBlockEncoder blockEncoder, final KeyComparator comparator)
       throws IOException {
-    super(cacheConf, outputStream, null, blockSize, compress, comparator);
+    super(cacheConf, outputStream, null, blockSize, compress,
+        blockEncoder, comparator);
   }
 
   /**
@@ -202,13 +212,17 @@ public class HFileWriterV1 extends Abstr
 
     if (cacheConf.shouldCacheDataOnWrite()) {
       baosDos.flush();
+      // we do not do data block encoding on disk for HFile v1
       byte[] bytes = baos.toByteArray();
-      HFileBlock cBlock = new HFileBlock(BlockType.DATA,
+      HFileBlock block = new HFileBlock(BlockType.DATA,
           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
-          ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
-      passSchemaMetricsTo(cBlock);
+          ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
+          blockBegin, MemStore.NO_PERSISTENT_TS);
+      block = blockEncoder.diskToCacheFormat(block, false);
+      passSchemaMetricsTo(block);
       cacheConf.getBlockCache().cacheBlock(
-          HFile.getBlockCacheKey(name, blockBegin), cBlock);
+          new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE,
+              block.getBlockType()), block);
       baosDos.close();
     }
     blockNumber++;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Thu Jan 26 02:58:57 2012
@@ -49,9 +49,13 @@ public class HFileWriterV2 extends Abstr
   static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
 
   /** Max memstore (mvcc) timestamp in FileInfo */
-  public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+  public static final byte [] MAX_MEMSTORE_TS_KEY =
+      Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+
   /** KeyValue version in FileInfo */
-  public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
+  public static final byte [] KEY_VALUE_VERSION =
+      Bytes.toBytes("KEY_VALUE_VERSION");
+
   /** Version for KeyValue which includes memstore timestamp */
   public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
 
@@ -92,10 +96,10 @@ public class HFileWriterV2 extends Abstr
 
     @Override
     public Writer createWriter(FileSystem fs, Path path, int blockSize,
-        Compression.Algorithm compress,
+        Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
         final KeyComparator comparator) throws IOException {
       return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
-          compress, comparator);
+          compress, blockEncoder, comparator);
     }
 
     @Override
@@ -128,7 +132,7 @@ public class HFileWriterV2 extends Abstr
       FileSystem fs, Path path)
       throws IOException {
     this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
-        HFile.DEFAULT_COMPRESSION_ALGORITHM, null);
+        HFile.DEFAULT_COMPRESSION_ALGORITHM, null, null);
   }
 
   /**
@@ -139,15 +143,16 @@ public class HFileWriterV2 extends Abstr
       Path path, int blockSize, String compressAlgoName,
       final KeyComparator comparator) throws IOException {
     this(conf, cacheConf, fs, path, blockSize,
-        compressionByName(compressAlgoName), comparator);
+        compressionByName(compressAlgoName), null, comparator);
   }
 
   /** Constructor that takes a path, creates and closes the output stream. */
   public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs,
       Path path, int blockSize, Compression.Algorithm compressAlgo,
+      HFileDataBlockEncoder blockEncoder,
       final KeyComparator comparator) throws IOException {
     super(cacheConf, createOutputStream(conf, fs, path), path,
-        blockSize, compressAlgo, comparator);
+        blockSize, compressAlgo, blockEncoder, comparator);
     SchemaMetrics.configureGlobally(conf);
     finishInit(conf);
   }
@@ -167,7 +172,8 @@ public class HFileWriterV2 extends Abstr
       final FSDataOutputStream outputStream, final int blockSize,
       final Compression.Algorithm compress, final KeyComparator comparator)
       throws IOException {
-    super(cacheConf, outputStream, null, blockSize, compress, comparator);
+    super(cacheConf, outputStream, null, blockSize, compress, null,
+        comparator);
     finishInit(conf);
   }
 
@@ -177,7 +183,8 @@ public class HFileWriterV2 extends Abstr
       throw new IllegalStateException("finishInit called twice");
 
     // HFile filesystem-level (non-caching) block writer
-    fsBlockWriter = new HFileBlock.Writer(compressAlgo);
+    fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
+        includeMemstoreTS);
 
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
@@ -225,8 +232,9 @@ public class HFileWriterV2 extends Abstr
     long startTimeNs = System.nanoTime();
 
     // Update the first data block offset for scanning.
-    if (firstDataBlockOffset == -1)
+    if (firstDataBlockOffset == -1) {
       firstDataBlockOffset = outputStream.getPos();
+    }
 
     // Update the last data block offset
     lastDataBlockOffset = outputStream.getPos();
@@ -253,7 +261,7 @@ public class HFileWriterV2 extends Abstr
         long offset = outputStream.getPos();
         boolean cacheThisBlock = ibw.cacheOnWrite();
         ibw.writeInlineBlock(fsBlockWriter.startWriting(
-            ibw.getInlineBlockType(), cacheThisBlock));
+            ibw.getInlineBlockType()));
         fsBlockWriter.writeHeaderAndData(outputStream);
         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
             fsBlockWriter.getUncompressedSizeWithoutHeader());
@@ -272,11 +280,15 @@ public class HFileWriterV2 extends Abstr
    *          the cache key.
    */
   private void doCacheOnWrite(long offset) {
-    // Cache this block on write.
-    HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
+    // We don't cache-on-write data blocks on compaction, so assume this is not
+    // a compaction.
+    final boolean isCompaction = false;
+    HFileBlock cacheFormatBlock = blockEncoder.diskToCacheFormat(
+        fsBlockWriter.getBlockForCaching(), isCompaction);
     passSchemaMetricsTo(cacheFormatBlock);
     cacheConf.getBlockCache().cacheBlock(
-        HFile.getBlockCacheKey(name, offset), cacheFormatBlock);
+        new BlockCacheKey(name, offset, blockEncoder.getEncodingInCache(),
+            cacheFormatBlock.getBlockType()), cacheFormatBlock);
   }
 
   /**
@@ -286,8 +298,7 @@ public class HFileWriterV2 extends Abstr
    */
   private void newBlock() throws IOException {
     // This is where the next block begins.
-    fsBlockWriter.startWriting(BlockType.DATA,
-        cacheConf.shouldCacheDataOnWrite());
+    fsBlockWriter.startWriting(BlockType.DATA);
     firstKeyInBlock = null;
   }
 
@@ -419,8 +430,7 @@ public class HFileWriterV2 extends Abstr
         // store the beginning offset
         long offset = outputStream.getPos();
         // write the metadata content
-        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META,
-            cacheConf.shouldCacheDataOnWrite());
+        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
         metaData.get(i).write(dos);
 
         fsBlockWriter.writeHeaderAndData(outputStream);
@@ -446,7 +456,7 @@ public class HFileWriterV2 extends Abstr
 
     // Meta block index.
     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
-        BlockType.ROOT_INDEX, false), "meta");
+        BlockType.ROOT_INDEX), "meta");
     fsBlockWriter.writeHeaderAndData(outputStream);
     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
 
@@ -456,8 +466,7 @@ public class HFileWriterV2 extends Abstr
     }
 
     // File info
-    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
-        false));
+    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
     fsBlockWriter.writeHeaderAndData(outputStream);
     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Jan 26 02:58:57 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -753,4 +754,16 @@ public class LruBlockCache implements Bl
     }
     return counts;
   }
+
+  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
+    Map<DataBlockEncoding, Integer> counts =
+        new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
+    for (BlockCacheKey cacheKey : map.keySet()) {
+      DataBlockEncoding encoding = cacheKey.getDataBlockEncoding();
+      Integer count = counts.get(encoding);
+      counts.put(encoding, (count == null ? 0 : count) + 1);
+    }
+    return counts;
+  }
+
 }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Does not perform any kind of encoding/decoding.
+ */
+public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
+
+  public static final NoOpDataBlockEncoder INSTANCE =
+      new NoOpDataBlockEncoder();
+
+  /** Cannot be instantiated. Use {@link #INSTANCE} instead. */
+  private NoOpDataBlockEncoder() {
+  }
+
+  @Override
+  public HFileBlock diskToCacheFormat(HFileBlock block, boolean isCompaction) {
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      throw new IllegalStateException("Unexpected encoded block");
+    }
+    return block;
+  }
+
+  @Override
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+      ByteBuffer in, boolean includesMemstoreTS) {
+    return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+  }
+
+  @Override
+  public boolean useEncodedScanner(boolean isCompaction) {
+    return false;
+  }
+
+  @Override
+  public void saveMetadata(StoreFile.Writer storeFileWriter) {
+  }
+
+  @Override
+  public DataBlockEncoding getEncodingOnDisk() {
+    return DataBlockEncoding.NONE;
+  }
+
+  @Override
+  public DataBlockEncoding getEncodingInCache() {
+    return DataBlockEncoding.NONE;
+  }
+
+  @Override
+  public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
+    return DataBlockEncoding.NONE;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Jan 26 02:58:57 2012
@@ -63,8 +63,11 @@ import org.apache.hadoop.hbase.client.Se
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -528,9 +531,12 @@ public class LoadIncrementalHFiles exten
     CacheConfig cacheConf = new CacheConfig(conf);
     HalfStoreFileReader halfReader = null;
     StoreFile.Writer halfWriter = null;
+    HFileDataBlockEncoder dataBlockEncoder = new HFileDataBlockEncoderImpl(
+        familyDescriptor.getDataBlockEncodingOnDisk(),
+        familyDescriptor.getDataBlockEncoding());
     try {
       halfReader = new HalfStoreFileReader(fs, inFile, cacheConf,
-          reference);
+          reference, DataBlockEncoding.NONE);
       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
 
       int blocksize = familyDescriptor.getBlocksize();
@@ -538,7 +544,8 @@ public class LoadIncrementalHFiles exten
       BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
 
       halfWriter = new StoreFile.Writer(
-          fs, outFile, blocksize, compression, conf, cacheConf,
+          fs, outFile, blocksize, compression, dataBlockEncoder,
+          conf, cacheConf,
           KeyValue.COMPARATOR, bloomFilterType, 0);
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
@@ -638,7 +645,6 @@ public class LoadIncrementalHFiles exten
       Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
       for (Path hfile : hfiles) {
         if (hfile.getName().startsWith("_")) continue;
-        
         HFile.Reader reader = HFile.createReader(fs, hfile,
             new CacheConfig(getConf()));
         final byte[] first, last;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Thu Jan 26 02:58:57 2012
@@ -79,7 +79,7 @@ public class CompactSplitThread implemen
           "hbase.regionserver.thread.compaction.throttle", 0);
     } else {
       // we have a complicated default. see HBASE-3877
-      long flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
+      long flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
           HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
       long splitSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
           HConstants.DEFAULT_MAX_FILE_SIZE);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jan 26 02:58:57 2012
@@ -490,7 +490,7 @@ public class HRegion implements HeapSize
     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
 
     if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
-      flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
+      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
          HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
     }
     this.memstoreFlushSize = flushSize;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Jan 26 02:58:57 2012
@@ -879,6 +879,9 @@ public class MemStore implements HeapSiz
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
       (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 
+  /** Used for readability when we don't store memstore timestamp in HFile */
+  public static final boolean NO_PERSISTENT_TS = false;
+
   /*
    * Calculate how the MemStore size has changed.  Includes overhead of the
    * backing Map.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Jan 26 02:58:57 2012
@@ -54,9 +54,12 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
+import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
@@ -150,6 +153,7 @@ public class Store extends SchemaConfigu
   private final Compression.Algorithm compression;
   /** Compression algorithm for major compaction */
   private final Compression.Algorithm compactionCompression;
+  private HFileDataBlockEncoder dataBlockEncoder;
 
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
@@ -187,6 +191,11 @@ public class Store extends SchemaConfigu
     this.compactionCompression =
       (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
         family.getCompactionCompression() : this.compression;
+
+    this.dataBlockEncoder =
+        new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
+            family.getDataBlockEncoding());
+
     this.comparator = info.getComparator();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
@@ -276,6 +285,21 @@ public class Store extends SchemaConfigu
   public Path getHomedir() {
     return homedir;
   }
+  
+  /**
+   * @return the data block encoder
+   */
+  public HFileDataBlockEncoder getDataBlockEncoder() {
+    return dataBlockEncoder;
+  }
+
+  /**
+   * Should be used only in tests.
+   * @param blockEncoder the block delta encoder to use
+   */
+  void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
+    this.dataBlockEncoder = blockEncoder;
+  }
 
   /**
    * Creates an unsorted list of StoreFile loaded in parallel
@@ -314,7 +338,7 @@ public class Store extends SchemaConfigu
       completionService.submit(new Callable<StoreFile>() {
         public StoreFile call() throws IOException {
           StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
-              family.getBloomFilterType());
+              family.getBloomFilterType(), dataBlockEncoder);
           passSchemaMetricsTo(storeFile);
           storeFile.createReader();
           return storeFile;
@@ -488,8 +512,9 @@ public class Store extends SchemaConfigu
     StoreFile.rename(fs, srcPath, dstPath);
 
     StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
-        this.family.getBloomFilterType());
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
     passSchemaMetricsTo(sf);
+
     sf.createReader();
 
     LOG.info("Moved hfile " + srcPath + " into store directory " +
@@ -624,7 +649,6 @@ public class Store extends SchemaConfigu
       MonitoredTask status)
       throws IOException {
     StoreFile.Writer writer;
-    String fileName;
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = region.getSmallestReadPoint();
     long flushed = 0;
@@ -720,8 +744,9 @@ public class Store extends SchemaConfigu
 
     status.setStatus("Flushing " + this + ": reopening flushed file");
     StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
-        this.family.getBloomFilterType());
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
     passSchemaMetricsTo(sf);
+
     StoreFile.Reader r = sf.createReader();
     this.storeSize += r.length();
     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@@ -768,8 +793,8 @@ public class Store extends SchemaConfigu
       writerCacheConf = cacheConf;
     }
     StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
-        blocksize, compression, comparator, conf, writerCacheConf,
-        family.getBloomFilterType(), maxKeyCount);
+        blocksize, compression, dataBlockEncoder, comparator, conf,
+        writerCacheConf, family.getBloomFilterType(), maxKeyCount);
     // The store file writer's path does not include the CF name, so we need
     // to configure the HFile writer directly.
     SchemaConfigured sc = (SchemaConfigured) w.writer;
@@ -1377,7 +1402,8 @@ public class Store extends SchemaConfigu
           LOG.debug("Compacting " + file +
             ", keycount=" + keyCount +
             ", bloomtype=" + r.getBloomFilterType().toString() +
-            ", size=" + StringUtils.humanReadableInt(r.length()) );
+            ", size=" + StringUtils.humanReadableInt(r.length()) +
+            ", encoding=" + r.getHFileReader().getEncodingOnDisk());
         }
       }
       // For major compactions calculate the earliest put timestamp
@@ -1494,7 +1520,8 @@ public class Store extends SchemaConfigu
     StoreFile storeFile = null;
     try {
       storeFile = new StoreFile(this.fs, path, this.conf,
-          this.cacheConf, this.family.getBloomFilterType());
+          this.cacheConf, this.family.getBloomFilterType(),
+          NoOpDataBlockEncoder.INSTANCE);
       passSchemaMetricsTo(storeFile);
       storeFile.createReader();
     } catch (IOException e) {
@@ -1546,7 +1573,7 @@ public class Store extends SchemaConfigu
             " to " + destPath);
       }
       result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
-          this.family.getBloomFilterType());
+          this.family.getBloomFilterType(), this.dataBlockEncoder);
       passSchemaMetricsTo(result);
       result.createReader();
     }
@@ -1641,7 +1668,7 @@ public class Store extends SchemaConfigu
 
   /**
    * Find the key that matches <i>row</i> exactly, or the one that immediately
-   * preceeds it. WARNING: Only use this method on a table where writes occur
+   * precedes it. WARNING: Only use this method on a table where writes occur
    * with strictly increasing timestamps. This method assumes this pattern of
    * writes in order to make it reasonably performant.  Also our search is
    * dependent on the axiom that deletes are for cells that are in the container
@@ -2134,8 +2161,8 @@ public class Store extends SchemaConfigu
   }
 
   public static final long FIXED_OVERHEAD = 
-      ClassSize.align(new SchemaConfigured().heapSize()
-          + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+      ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+          + (19 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
           + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD



Mime
View raw message