hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject [1/2] HBASE-11331 [blockcache] lazy block decompression
Date Wed, 10 Sep 2014 22:50:30 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 17b2cea0b -> 4d51cf0ee


http://git-wip-us.apache.org/repos/asf/hbase/blob/4d51cf0e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index bc09c88..96247c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -69,6 +67,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
 
 @Category(MediumTests.class)
 @RunWith(Parameterized.class)
@@ -234,8 +233,14 @@ public class TestHFileBlock {
 
   @Test
   public void testNoCompression() throws IOException {
-    assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false).
-                 getBlockForCaching().getUncompressedSizeWithoutHeader());
+    CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
+    Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
+
+    HFileBlock block =
+      createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
+    assertEquals(4000, block.getUncompressedSizeWithoutHeader());
+    assertEquals(4004, block.getOnDiskSizeWithoutHeader());
+    assertTrue(block.isUnpacked());
   }
 
   @Test
@@ -316,14 +321,14 @@ public class TestHFileBlock {
         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
         assertEquals(algo == GZ ? 2173 : 4936,
                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
-        String blockStr = b.toString();
+        HFileBlock expected = b;
 
         if (algo == GZ) {
           is = fs.open(path);
           hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
                                 b.totalChecksumBytes(), -1, pread);
-          assertEquals(blockStr, b.toString());
+          assertEquals(expected, b);
           int wrongCompressedSize = 2172;
           try {
             b = hbr.readBlockData(0, wrongCompressedSize
@@ -409,20 +414,35 @@ public class TestHFileBlock {
           HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
           hbr.setDataBlockEncoder(dataBlockEncoder);
           hbr.setIncludesMemstoreTS(includesMemstoreTS);
-          HFileBlock b;
+          HFileBlock blockFromHFile, blockUnpacked;
           int pos = 0;
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
-            b = hbr.readBlockData(pos, -1, -1, pread);
+            blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
             assertEquals(0, HFile.getChecksumFailuresCount());
-            b.sanityCheck();
-            pos += b.getOnDiskSizeWithHeader();
+            blockFromHFile.sanityCheck();
+            pos += blockFromHFile.getOnDiskSizeWithHeader();
             assertEquals((int) encodedSizes.get(blockId),
-                b.getUncompressedSizeWithoutHeader());
-            ByteBuffer actualBuffer = b.getBufferWithoutHeader();
+              blockFromHFile.getUncompressedSizeWithoutHeader());
+            assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
+            long packedHeapsize = blockFromHFile.heapSize();
+            blockUnpacked = blockFromHFile.unpack(meta, hbr);
+            assertTrue(blockUnpacked.isUnpacked());
+            if (meta.isCompressedOrEncrypted()) {
+              LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
+                .heapSize());
+              assertFalse(packedHeapsize == blockUnpacked.heapSize());
+              assertTrue("Packed heapSize should be < unpacked heapSize",
+                packedHeapsize < blockUnpacked.heapSize());
+            }
+            ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
             if (encoding != DataBlockEncoding.NONE) {
               // We expect a two-byte big-endian encoding id.
-              assertEquals(0, actualBuffer.get(0));
-              assertEquals(encoding.getId(), actualBuffer.get(1));
+              assertEquals(
+                "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
+                Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
+              assertEquals(
+                "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
+                Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
               actualBuffer.position(2);
               actualBuffer = actualBuffer.slice();
             }
@@ -432,6 +452,22 @@ public class TestHFileBlock {
 
             // test if content matches, produce nice message
             assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
+
+            // test serialized blocks
+            for (boolean reuseBuffer : new boolean[] { false, true }) {
+              ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
+              blockFromHFile.serialize(serialized);
+              HFileBlock deserialized =
+                (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
+              assertEquals(
+                "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
+                blockFromHFile, deserialized);
+              // intentional reference comparison
+              if (blockFromHFile != blockUnpacked) {
+                assertEquals("Deserializaed block cannot be unpacked correctly.",
+                  blockUnpacked, deserialized.unpack(meta, hbr));
+              }
+            }
           }
           is.close();
         }
@@ -439,6 +475,11 @@ public class TestHFileBlock {
     }
   }
 
+  static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
+      boolean pread) {
+    return String.format("compression %s, encoding %s, pread %s", compression, encoding,
pread);
+  }
+
   static void assertBuffersEqual(ByteBuffer expectedBuffer,
       ByteBuffer actualBuffer, Compression.Algorithm compression,
       DataBlockEncoding encoding, boolean pread) {
@@ -451,9 +492,8 @@ public class TestHFileBlock {
       }
 
       fail(String.format(
-          "Content mismath for compression %s, encoding %s, " +
-          "pread %s, commonPrefix %d, expected %s, got %s",
-          compression, encoding, pread, prefix,
+          "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
+          buildMessageDetails(compression, encoding, pread), prefix,
           nextBytesToStr(expectedBuffer, prefix),
           nextBytesToStr(actualBuffer, prefix)));
     }
@@ -476,6 +516,7 @@ public class TestHFileBlock {
   }
 
   protected void testPreviousOffsetInternals() throws IOException {
+    // TODO: parameterize these nested loops.
     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
       for (boolean pread : BOOLEAN_VALUES) {
         for (boolean cacheOnWrite : BOOLEAN_VALUES) {
@@ -545,8 +586,10 @@ public class TestHFileBlock {
             curOffset += b.getOnDiskSizeWithHeader();
 
             if (cacheOnWrite) {
-              // In the cache-on-write mode we store uncompressed bytes so we
-              // can compare them to what was read by the block reader.
+              // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
+              // verifies that the unpacked value read back off disk matches the unpacked
value
+              // generated before writing to disk.
+              b = b.unpack(meta, hbr);
               // b's buffer has header + data + checksum while
               // expectedContents have header + data only
               ByteBuffer bufRead = b.getBufferWithHeader();
@@ -565,11 +608,10 @@ public class TestHFileBlock {
                     + algo + ", pread=" + pread
                     + ", cacheOnWrite=" + cacheOnWrite + "):\n";
                 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
-                    bufExpected.arrayOffset(), Math.min(32,
-                        bufExpected.limit()))
+                  bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
                     + ", actual:\n"
                     + Bytes.toStringBinary(bufRead.array(),
-                        bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
+                  bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
                 if (detailedLogging) {
                   LOG.warn("expected header" +
                            HFileBlock.toStringHeader(bufExpected) +
@@ -758,6 +800,7 @@ public class TestHFileBlock {
       if (detailedLogging) {
         LOG.info("Written block #" + i + " of type " + bt
             + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
+            + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
             + " at offset " + pos);
       }
     }
@@ -806,7 +849,4 @@ public class TestHFileBlock {
           block.heapSize());
     }
   }
-
-
 }
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d51cf0e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
index 88fdb77..166e2cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -205,7 +203,7 @@ public class TestHFileBlockCompatibility {
         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
         assertEquals(algo == GZ ? 2173 : 4936,
                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
-        String blockStr = b.toString();
+        HFileBlock expected = b;
 
         if (algo == GZ) {
           is = fs.open(path);
@@ -213,7 +211,7 @@ public class TestHFileBlockCompatibility {
               meta);
           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
                                 b.totalChecksumBytes(), -1, pread);
-          assertEquals(blockStr, b.toString());
+          assertEquals(expected, b);
           int wrongCompressedSize = 2172;
           try {
             b = hbr.readBlockData(0, wrongCompressedSize
@@ -301,6 +299,10 @@ public class TestHFileBlockCompatibility {
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
             b = hbr.readBlockData(pos, -1, -1, pread);
             b.sanityCheck();
+            if (meta.isCompressedOrEncrypted()) {
+              assertFalse(b.isUnpacked());
+              b = b.unpack(meta, hbr);
+            }
             pos += b.getOnDiskSizeWithHeader();
 
             assertEquals((int) encodedSizes.get(blockId),
@@ -335,7 +337,7 @@ public class TestHFileBlockCompatibility {
    * in this class but the code in HFileBlock.Writer will continually
    * evolve.
    */
-  public static final class Writer extends HFileBlock.Writer{
+  public static final class Writer extends HFileBlock.Writer {
 
     // These constants are as they were in minorVersion 0.
     private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
@@ -416,10 +418,6 @@ public class TestHFileBlockCompatibility {
 
     private int unencodedDataSizeWritten;
 
-    /**
-     * @param compressionAlgorithm compression algorithm to use
-     * @param dataBlockEncoderAlgo data block encoding algorithm to use
-     */
     public Writer(Compression.Algorithm compressionAlgorithm,
         HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag)
{
       this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d51cf0e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 31546e2..6ec45a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -53,6 +48,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.*;
+
 @Category(SmallTests.class)
 public class TestHFileEncryption {
   private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class);
@@ -95,11 +92,13 @@ public class TestHFileEncryption {
     return hbw.getOnDiskSizeWithHeader();
   }
 
-  private long readAndVerifyBlock(long pos, HFileBlock.FSReaderV2 hbr, int size)
+  private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderV2 hbr,
int size)
       throws IOException {
     HFileBlock b = hbr.readBlockData(pos, -1, -1, false);
     assertEquals(0, HFile.getChecksumFailuresCount());
     b.sanityCheck();
+    assertFalse(b.isUnpacked());
+    b = b.unpack(ctx, hbr);
     LOG.info("Read a block at " + pos + " with" +
         " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() +
         " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() +
@@ -142,7 +141,7 @@ public class TestHFileEncryption {
         HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, fileContext);
         long pos = 0;
         for (int i = 0; i < blocks; i++) {
-          pos += readAndVerifyBlock(pos, hbr, blockSizes[i]);
+          pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]);
         }
       } finally {
         is.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d51cf0e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
index 27e7051..b27f5b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -170,8 +171,8 @@ public class TestHFileWriterV2 {
     
     // Meta index.
     metaBlockIndexReader.readRootIndex(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(),
-        trailer.getMetaIndexCount());
+        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
+          .getByteStream(), trailer.getMetaIndexCount());
     // File info
     FileInfo fileInfo = new FileInfo();
     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
@@ -191,6 +192,10 @@ public class TestHFileWriterV2 {
     while (curBlockPos <= trailer.getLastDataBlockOffset()) {
       HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
       assertEquals(BlockType.DATA, block.getBlockType());
+      if (meta.isCompressedOrEncrypted()) {
+        assertFalse(block.isUnpacked());
+        block = block.unpack(meta, blockReader);
+      }
       ByteBuffer buf = block.getBufferWithoutHeader();
       while (buf.hasRemaining()) {
         int keyLen = buf.getInt();
@@ -232,7 +237,8 @@ public class TestHFileWriterV2 {
     while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
       LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
           trailer.getLoadOnOpenDataOffset());
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
+        .unpack(meta, blockReader);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
       ByteBuffer buf = block.getBufferWithoutHeader();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d51cf0e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index 8b92c56..b19efff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -191,8 +191,7 @@ public class TestHFileWriterV3 {
     // Data index. We also read statistics about the block index written after
     // the root level.
     dataBlockIndexReader.readMultiLevelIndexRoot(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
-        trailer.getDataIndexCount());
+        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
     
     if (findMidKey) {
       byte[] midkey = dataBlockIndexReader.midkey();
@@ -201,8 +200,8 @@ public class TestHFileWriterV3 {
     
     // Meta index.
     metaBlockIndexReader.readRootIndex(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(),
-        trailer.getMetaIndexCount());
+        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
+          .getByteStream(), trailer.getMetaIndexCount());
     // File info
     FileInfo fileInfo = new FileInfo();
     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
@@ -220,7 +219,8 @@ public class TestHFileWriterV3 {
     fsdis.seek(0);
     long curBlockPos = 0;
     while (curBlockPos <= trailer.getLastDataBlockOffset()) {
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
+        .unpack(context, blockReader);
       assertEquals(BlockType.DATA, block.getBlockType());
       ByteBuffer buf = block.getBufferWithoutHeader();
       int keyLen = -1;
@@ -278,7 +278,8 @@ public class TestHFileWriterV3 {
     while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
       LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
           trailer.getLoadOnOpenDataOffset());
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
+        .unpack(context, blockReader);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
       ByteBuffer buf = block.getBufferWithoutHeader();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d51cf0e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
new file mode 100644
index 0000000..e752dd2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import com.google.common.collect.Iterables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig},
+ * and {@link LruBlockCache}.
+ */
+@Category(SmallTests.class)
+@RunWith(Parameterized.class)
+public class TestLazyDataBlockDecompression {
+  private static final Log LOG = LogFactory.getLog(TestLazyDataBlockDecompression.class);
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private FileSystem fs;
+
+  @Parameterized.Parameter(0)
+  public boolean cacheOnWrite;
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+      { false },
+      { true }
+    });
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
+    fs = FileSystem.get(TEST_UTIL.getConfiguration());
+  }
+
+  @After
+  public void tearDown() {
+    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
+    fs = null;
+  }
+
+  /**
+   * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the
row
+   * bytes of the KeyValues written, in the order they were written.
+   */
+  private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path
path,
+      HFileContext cxt, int entryCount) throws IOException {
+    HFileWriterV2 writer = (HFileWriterV2)
+      new HFileWriterV2.WriterFactoryV2(conf, cc)
+        .withPath(fs, path)
+        .withFileContext(cxt)
+        .create();
+
+    // write a bunch of random kv's
+    Random rand = new Random(9713312); // some seed.
+    final byte[] family = Bytes.toBytes("f");
+    final byte[] qualifier = Bytes.toBytes("q");
+
+    for (int i = 0; i < entryCount; i++) {
+      byte[] keyBytes = TestHFileWriterV2.randomOrderedKey(rand, i);
+      byte[] valueBytes = TestHFileWriterV2.randomValue(rand);
+      // make a real keyvalue so that hfile tool can examine it
+      writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes));
+    }
+    writer.close();
+  }
+
+  /**
+   * Read all blocks from {@code path} to populate {@code blockCache}.
+   */
+  private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem
fs,
+      Path path, HFileContext cxt) throws IOException {
+    FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
+    long fileSize = fs.getFileStatus(path).getLen();
+    FixedFileTrailer trailer =
+      FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
+    HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig,
+      fsdis.getHfs(), conf);
+    reader.loadFileInfo();
+    long offset = trailer.getFirstDataBlockOffset(),
+      max = trailer.getLastDataBlockOffset();
+    List<HFileBlock> blocks = new ArrayList<HFileBlock>(4);
+    HFileBlock block;
+    while (offset <= max) {
+      block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false,
+      /* isCompaction */ false, /* updateCacheMetrics */ true, null, null);
+      offset += block.getOnDiskSizeWithHeader();
+      blocks.add(block);
+    }
+    LOG.info("read " + Iterables.toString(blocks));
+  }
+
+  @Test
+  public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception {
+    // enough room for 2 uncompressed block
+    int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1);
+    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
+      "testCompressionIncreasesEffectiveBlockcacheSize");
+    HFileContext context = new HFileContextBuilder()
+      .withCompression(Compression.Algorithm.GZ)
+      .build();
+    LOG.info("context=" + context);
+
+    // setup cache with lazy-decompression disabled.
+    Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
+    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
+    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
+    lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
+    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
+      new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled);
+    CacheConfig cc = new CacheConfig(lazyCompressDisabled);
+    assertFalse(cc.shouldCacheDataCompressed());
+    assertTrue(cc.getBlockCache() instanceof LruBlockCache);
+    LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache();
+    LOG.info("disabledBlockCache=" + disabledBlockCache);
+    assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
+    assertTrue("eviction thread spawned unintentionally.",
+      disabledBlockCache.getEvictionThread() == null);
+    assertEquals("freshly created blockcache contains blocks.",
+      0, disabledBlockCache.getBlockCount());
+
+    // 2000 kv's is ~3.6 full unencoded data blocks.
+    // Requires a conf and CacheConfig but should not be specific to this instance's cache
settings
+    writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000);
+
+    // populate the cache
+    cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context);
+    long disabledBlockCount = disabledBlockCache.getBlockCount();
+    assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount,
+      disabledBlockCount > 0);
+    long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount();
+    for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
+      disabledBlockCache.getMapForTests().entrySet()) {
+      HFileBlock block = (HFileBlock) e.getValue().getBuffer();
+      assertTrue("found a packed block, block=" + block, block.isUnpacked());
+    }
+
+    // count blocks with lazy decompression
+    Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
+    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
+    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
+    lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
+    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
+      new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled);
+    cc = new CacheConfig(lazyCompressEnabled);
+    assertTrue("test improperly configured.", cc.shouldCacheDataCompressed());
+    assertTrue(cc.getBlockCache() instanceof LruBlockCache);
+    LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache();
+    LOG.info("enabledBlockCache=" + enabledBlockCache);
+    assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize());
+    assertTrue("eviction thread spawned unintentionally.",
+      enabledBlockCache.getEvictionThread() == null);
+    assertEquals("freshly created blockcache contains blocks.",
+      0, enabledBlockCache.getBlockCount());
+
+    cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context);
+    long enabledBlockCount = enabledBlockCache.getBlockCount();
+    assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount,
+      enabledBlockCount > 0);
+    long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount();
+    int candidatesFound = 0;
+    for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
+        enabledBlockCache.getMapForTests().entrySet()) {
+      candidatesFound++;
+      HFileBlock block = (HFileBlock) e.getValue().getBuffer();
+      if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) {
+        assertFalse("found an unpacked block, block=" + block + ", block buffer capacity="
+
+          block.getBufferWithoutHeader().capacity(), block.isUnpacked());
+      }
+    }
+    assertTrue("did not find any candidates for compressed caching. Invalid test.",
+      candidatesFound > 0);
+
+    LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
+      enabledBlockCount);
+    assertTrue("enabling compressed data blocks should increase the effective cache size.
" +
+      "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
+      enabledBlockCount, disabledBlockCount < enabledBlockCount);
+
+    LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
+      enabledEvictedCount);
+    assertTrue("enabling compressed data blocks should reduce the number of evictions. "
+
+      "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
+      enabledEvictedCount, enabledEvictedCount < disabledEvictedCount);
+  }
+}


Mime
View raw message