Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 986C518FC5 for ; Wed, 23 Mar 2016 01:39:47 +0000 (UTC) Received: (qmail 16369 invoked by uid 500); 23 Mar 2016 01:39:47 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 16312 invoked by uid 500); 23 Mar 2016 01:39:47 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 16295 invoked by uid 99); 23 Mar 2016 01:39:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2016 01:39:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3A45DE0044; Wed, 23 Mar 2016 01:39:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Wed, 23 Mar 2016 01:39:47 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] hbase git commit: Revert "HBASE-15477 Purge 'next block header' from cached blocks" Repository: hbase Updated Branches: refs/heads/master 3f3613a23 -> a13d6e000 http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 69c42c3..66aced0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1317,22 +1317,25 @@ public class BucketCache implements BlockCache, HeapSize { final AtomicLong realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); - // This cacheable thing can't be serialized + // This cacheable thing can't be serialized... if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { - // If an instance of HFileBlock, save on some allocations. - HFileBlock block = (HFileBlock)data; - ByteBuff sliceBuf = block.getBufferReadOnly(); - ByteBuffer metadata = block.getMetaData(); + HFileBlock block = (HFileBlock) data; + ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader(); + sliceBuf.rewind(); + assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE || + len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); + block.serializeExtraInfo(extraInfoBuffer); if (LOG.isTraceEnabled()) { LOG.trace("Write offset=" + offset + ", len=" + len); } ioEngine.write(sliceBuf, offset); - ioEngine.write(metadata, offset + len - metadata.limit()); + ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); } else { ByteBuffer bb = ByteBuffer.allocate(len); data.serialize(bb); http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index ed86a83..e26022e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -33,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; // TODO: Change name from KeyValueScanner to CellScanner only we already have a simple CellScanner // so this should be something else altogether, a decoration on our base CellScanner. TODO. // This class shows in CPs so do it all in one swell swoop. HBase-2.0.0. -public interface KeyValueScanner extends Shipper, Closeable { +public interface KeyValueScanner extends Shipper { /** * The byte array represents for NO_NEXT_INDEXED_KEY; * The actual value is irrelevant because this is always compared by reference. @@ -75,7 +74,6 @@ public interface KeyValueScanner extends Shipper, Closeable { * The default implementation for this would be to return 0. A file having * lower sequence id will be considered to be the older one. */ - // TODO: Implement SequenceId Interface instead. long getSequenceID(); /** @@ -139,11 +137,11 @@ public interface KeyValueScanner extends Shipper, Closeable { * peek KeyValue of scanner has the same row with specified Cell, * otherwise seek the scanner at the first Cell of the row which is the * previous row of specified KeyValue - * + * * @param key seek KeyValue * @return true if the scanner is at the valid KeyValue, false if such * KeyValue does not exist - * + * */ public boolean backwardSeek(Cell key) throws IOException; @@ -158,7 +156,7 @@ public interface KeyValueScanner extends Shipper, Closeable { /** * Seek the scanner at the first KeyValue of last row - * + * * @return true if scanner has values left, false if the underlying data is * empty * @throws IOException @@ -171,4 +169,4 @@ public interface KeyValueScanner extends Shipper, Closeable { * see HFileWriterImpl#getMidpoint, or null if not known. */ public Cell getNextIndexedKey(); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index b6164b2..61eb9b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1271,7 +1271,7 @@ public class StoreFile { } /** - * @deprecated Do not write further code which depends on this call. Instead + * Warning: Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. * @@ -1285,7 +1285,7 @@ public class StoreFile { } /** - * @deprecated Do not write further code which depends on this call. Instead + * Warning: Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. * http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 040685d..69671e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ChecksumType; public class CacheTestUtils { @@ -65,7 +66,6 @@ public class CacheTestUtils { /*Post eviction, heapsize should be the same */ assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); } - public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, final int numThreads, final int numQueries, final double passingScore) throws Exception { @@ -339,16 +339,25 @@ public class CacheTestUtils { } - private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { + private static HFileBlockPair[] generateHFileBlocks(int blockSize, + int numBlocks) { HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; Random rand = new Random(); HashSet usedStrings = new HashSet(); for (int i = 0; i < numBlocks; i++) { - ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); + + // The buffer serialized size needs to match the size of BlockSize. So we + // declare our data size to be smaller than it by the serialization space + // required. + + SingleByteBuff cachedBuffer = new SingleByteBuff(ByteBuffer.allocate(blockSize + - HFileBlock.EXTRA_SERIALIZATION_SPACE)); rand.nextBytes(cachedBuffer.array()); cachedBuffer.rewind(); - int onDiskSizeWithoutHeader = blockSize; - int uncompressedSizeWithoutHeader = blockSize; + int onDiskSizeWithoutHeader = blockSize + - HFileBlock.EXTRA_SERIALIZATION_SPACE; + int uncompressedSizeWithoutHeader = blockSize + - HFileBlock.EXTRA_SERIALIZATION_SPACE; long prevBlockOffset = rand.nextLong(); BlockType.DATA.write(cachedBuffer); cachedBuffer.putInt(onDiskSizeWithoutHeader); @@ -367,7 +376,7 @@ public class CacheTestUtils { onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER, blockSize, - onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta); + onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta); String strKey; /* No conflicting keys */ @@ -386,4 +395,4 @@ public class CacheTestUtils { BlockCacheKey blockName; HFileBlock block; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index a9d8258..5158e35 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -259,6 +259,7 @@ public class TestCacheOnWrite { assertTrue(testDescription, scanner.seekTo()); long offset = 0; + HFileBlock prevBlock = null; EnumMap blockCountByType = new EnumMap(BlockType.class); @@ -266,10 +267,14 @@ public class TestCacheOnWrite { List cachedBlocksOffset = new ArrayList(); Map cachedBlocks = new HashMap(); while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. - HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, - encodingInCache); + HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, + false, true, null, encodingInCache); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); @@ -302,6 +307,7 @@ public class TestCacheOnWrite { assertEquals( block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); } + prevBlock = block; offset += block.getOnDiskSizeWithHeader(); BlockType bt = block.getBlockType(); Integer count = blockCountByType.get(bt); http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index d91a811..91ab8c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -94,7 +94,7 @@ public class TestChecksum { meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( is, totalSize, (HFileSystem) fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, false); + HFileBlock b = hbr.readBlockData(0, -1, -1, false); assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); } @@ -108,14 +108,12 @@ public class TestChecksum { ChecksumType cktype = itr.next(); Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName()); FSDataOutputStream os = fs.create(path); - HFileContext meta = new HFileContextBuilder(). - withChecksumType(cktype). - build(); + HFileContext meta = new HFileContextBuilder() + .withChecksumType(cktype).build(); HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); DataOutputStream dos = hbw.startWriting(BlockType.DATA); - for (int i = 0; i < 1000; ++i) { + for (int i = 0; i < 1000; ++i) dos.writeInt(i); - } hbw.writeHeaderAndData(os); int totalSize = hbw.getOnDiskSizeWithHeader(); os.close(); @@ -127,7 +125,7 @@ public class TestChecksum { meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( is, totalSize, (HFileSystem) fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, false); + HFileBlock b = hbr.readBlockData(0, -1, -1, false); ByteBuff data = b.getBufferWithoutHeader(); for (int i = 0; i < 1000; i++) { assertEquals(i, data.getInt()); @@ -190,7 +188,7 @@ public class TestChecksum { .withHBaseCheckSum(true) .build(); HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread); b.sanityCheck(); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, @@ -211,17 +209,17 @@ public class TestChecksum { // requests. Verify that this is correct. for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { - b = hbr.readBlockData(0, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); } // The next read should have hbase checksum verification reanabled, // we verify this by assertng that there was a hbase-checksum failure. - b = hbr.readBlockData(0, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread); assertEquals(1, HFile.getChecksumFailuresCount()); // Since the above encountered a checksum failure, we switch // back to not checking hbase checksums. - b = hbr.readBlockData(0, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); is.close(); @@ -232,7 +230,7 @@ public class TestChecksum { assertEquals(false, newfs.useHBaseChecksum()); is = new FSDataInputStreamWrapper(newfs, path); hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta); - b = hbr.readBlockData(0, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread); is.close(); b.sanityCheck(); b = b.unpack(meta, hbr); @@ -316,7 +314,7 @@ public class TestChecksum { .build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper( is, nochecksum), totalSize, hfs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread); is.close(); b.sanityCheck(); assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); @@ -356,4 +354,5 @@ public class TestChecksum { return false; // checksum validation failure } } -} \ No newline at end of file +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index eb87a0c..6748efc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -320,7 +320,7 @@ public class TestHFileBlock { .withIncludesTags(includesTag) .withCompression(algo).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); - HFileBlock b = hbr.readBlockData(0, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread); is.close(); assertEquals(0, HFile.getChecksumFailuresCount()); @@ -334,15 +334,17 @@ public class TestHFileBlock { is = fs.open(path); hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + - b.totalChecksumBytes(), pread); + b.totalChecksumBytes(), -1, pread); assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize - + HConstants.HFILEBLOCK_HEADER_SIZE, pread); + + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread); fail("Exception expected"); } catch (IOException ex) { - String expectedPrefix = "Passed in onDiskSizeWithHeader="; + String expectedPrefix = "On-disk size without header provided is " + + wrongCompressedSize + ", but block header contains " + + b.getOnDiskSizeWithoutHeader() + "."; assertTrue("Invalid exception message: '" + ex.getMessage() + "'.\nMessage is expected to start with: '" + expectedPrefix + "'", ex.getMessage().startsWith(expectedPrefix)); @@ -422,7 +424,7 @@ public class TestHFileBlock { HFileBlock blockFromHFile, blockUnpacked; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - blockFromHFile = hbr.readBlockData(pos, -1, pread); + blockFromHFile = hbr.readBlockData(pos, -1, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); blockFromHFile.sanityCheck(); pos += blockFromHFile.getOnDiskSizeWithHeader(); @@ -558,7 +560,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Reading block #" + i + " at offset " + curOffset); } - HFileBlock b = hbr.readBlockData(curOffset, -1, pread); + HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread); if (detailedLogging) { LOG.info("Block #" + i + ": " + b); } @@ -572,7 +574,8 @@ public class TestHFileBlock { // Now re-load this block knowing the on-disk size. This tests a // different branch in the loader. - HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread); + HFileBlock b2 = hbr.readBlockData(curOffset, + b.getOnDiskSizeWithHeader(), -1, pread); b2.sanityCheck(); assertEquals(b.getBlockType(), b2.getBlockType()); @@ -598,7 +601,7 @@ public class TestHFileBlock { b = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only - ByteBuff bufRead = b.getBufferReadOnly(); + ByteBuff bufRead = b.getBufferWithHeader(); ByteBuffer bufExpected = expectedContents.get(i); boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), bufRead.arrayOffset(), @@ -681,7 +684,7 @@ public class TestHFileBlock { HFileBlock b; try { long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; - b = hbr.readBlockData(offset, onDiskSizeArg, pread); + b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread); } catch (IOException ex) { LOG.error("Error in client " + clientId + " trying to read block at " + offset + ", pread=" + pread + ", withOnDiskSize=" + @@ -716,7 +719,8 @@ public class TestHFileBlock { protected void testConcurrentReadingInternals() throws IOException, InterruptedException, ExecutionException { for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { - Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); + Path path = + new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); Random rand = defaultRandom(); List offsets = new ArrayList(); List types = new ArrayList(); @@ -839,7 +843,8 @@ public class TestHFileBlock { .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) .withChecksumType(ChecksumType.NULL).build(); HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, - HFileBlock.FILL_HEADER, -1, 0, -1, meta); + HFileBlock.FILL_HEADER, -1, + 0, meta); long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase( new MultiByteBuff(buf).getClass(), true) + HConstants.HFILEBLOCK_HEADER_SIZE + size); http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java new file mode 100644 index 0000000..16607b9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -0,0 +1,750 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; +import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.Compressor; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.base.Preconditions; + +/** + * This class has unit tests to prove that older versions of + * HFiles (without checksums) are compatible with current readers. + */ +@Category({IOTests.class, SmallTests.class}) +@RunWith(Parameterized.class) +public class TestHFileBlockCompatibility { + + private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class); + private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { + NONE, GZ }; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private HFileSystem fs; + + private final boolean includesMemstoreTS; + private final boolean includesTag; + + public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) { + this.includesMemstoreTS = includesMemstoreTS; + this.includesTag = includesTag; + } + + @Parameters + public static Collection parameters() { + return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED; + } + + @Before + public void setUp() throws IOException { + fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration()); + } + + public byte[] createTestV1Block(Compression.Algorithm algo) + throws IOException { + Compressor compressor = algo.getCompressor(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream os = algo.createCompressionStream(baos, compressor, 0); + DataOutputStream dos = new DataOutputStream(os); + BlockType.META.write(dos); // Let's make this a meta block. + TestHFileBlock.writeTestBlockContents(dos); + dos.flush(); + algo.returnCompressor(compressor); + return baos.toByteArray(); + } + + private Writer createTestV2Block(Compression.Algorithm algo) + throws IOException { + final BlockType blockType = BlockType.DATA; + Writer hbw = new Writer(algo, null, + includesMemstoreTS, includesTag); + DataOutputStream dos = hbw.startWriting(blockType); + TestHFileBlock.writeTestBlockContents(dos); + // make sure the block is ready by calling hbw.getHeaderAndData() + hbw.getHeaderAndData(); + assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); + hbw.releaseCompressor(); + return hbw; + } + + private String createTestBlockStr(Compression.Algorithm algo, + int correctLength) throws IOException { + Writer hbw = createTestV2Block(algo); + byte[] testV2Block = hbw.getHeaderAndData(); + int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9; + if (testV2Block.length == correctLength) { + // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid + // variations across operating systems. + // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. + testV2Block[osOffset] = 3; + } + return Bytes.toStringBinary(testV2Block); + } + + @Test + public void testNoCompression() throws IOException { + assertEquals(4000, createTestV2Block(NONE).getBlockForCaching(). + getUncompressedSizeWithoutHeader()); + } + + @Test + public void testGzipCompression() throws IOException { + final String correctTestBlockStr = + "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" + + "\\xFF\\xFF\\xFF\\xFF" + // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html + + "\\x1F\\x8B" // gzip magic signature + + "\\x08" // Compression method: 8 = "deflate" + + "\\x00" // Flags + + "\\x00\\x00\\x00\\x00" // mtime + + "\\x00" // XFL (extra flags) + // OS (0 = FAT filesystems, 3 = Unix). However, this field + // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. + + "\\x03" + + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" + + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" + + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"; + final int correctGzipBlockLength = 82; + + String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength); + assertEquals(correctTestBlockStr, returnedStr); + } + + @Test + public void testReaderV2() throws IOException { + if(includesTag) { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + } + for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { + for (boolean pread : new boolean[] { false, true }) { + LOG.info("testReaderV2: Compression algorithm: " + algo + + ", pread=" + pread); + Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + + algo); + FSDataOutputStream os = fs.create(path); + Writer hbw = new Writer(algo, null, + includesMemstoreTS, includesTag); + long totalSize = 0; + for (int blockId = 0; blockId < 2; ++blockId) { + DataOutputStream dos = hbw.startWriting(BlockType.DATA); + for (int i = 0; i < 1234; ++i) + dos.writeInt(i); + hbw.writeHeaderAndData(os); + totalSize += hbw.getOnDiskSizeWithHeader(); + } + os.close(); + + FSDataInputStream is = fs.open(path); + HFileContext meta = new HFileContextBuilder() + .withHBaseCheckSum(false) + .withIncludesMvcc(includesMemstoreTS) + .withIncludesTags(includesTag) + .withCompression(algo) + .build(); + HFileBlock.FSReader hbr = + new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + is.close(); + + b.sanityCheck(); + assertEquals(4936, b.getUncompressedSizeWithoutHeader()); + assertEquals(algo == GZ ? 2173 : 4936, + b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); + HFileBlock expected = b; + + if (algo == GZ) { + is = fs.open(path); + hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, + meta); + b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + + b.totalChecksumBytes(), -1, pread); + assertEquals(expected, b); + int wrongCompressedSize = 2172; + try { + b = hbr.readBlockData(0, wrongCompressedSize + + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread); + fail("Exception expected"); + } catch (IOException ex) { + String expectedPrefix = "On-disk size without header provided is " + + wrongCompressedSize + ", but block header contains " + + b.getOnDiskSizeWithoutHeader() + "."; + assertTrue("Invalid exception message: '" + ex.getMessage() + + "'.\nMessage is expected to start with: '" + expectedPrefix + + "'", ex.getMessage().startsWith(expectedPrefix)); + } + is.close(); + } + } + } + } + + /** + * Test encoding/decoding data blocks. + * @throws IOException a bug or a problem with temporary files. + */ + @Test + public void testDataBlockEncoding() throws IOException { + if(includesTag) { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + } + final int numBlocks = 5; + for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { + for (boolean pread : new boolean[] { false, true }) { + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + LOG.info("testDataBlockEncoding algo " + algo + + " pread = " + pread + + " encoding " + encoding); + Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + + algo + "_" + encoding.toString()); + FSDataOutputStream os = fs.create(path); + HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ? + new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE; + TestHFileBlockCompatibility.Writer hbw = + new TestHFileBlockCompatibility.Writer(algo, + dataBlockEncoder, includesMemstoreTS, includesTag); + long totalSize = 0; + final List encodedSizes = new ArrayList(); + final List encodedBlocks = new ArrayList(); + for (int blockId = 0; blockId < numBlocks; ++blockId) { + hbw.startWriting(BlockType.DATA); + TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag); + hbw.writeHeaderAndData(os); + int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; + byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader(); + final int encodedSize = encodedResultWithHeader.length - headerLen; + if (encoding != DataBlockEncoding.NONE) { + // We need to account for the two-byte encoding algorithm ID that + // comes after the 24-byte block header but before encoded KVs. + headerLen += DataBlockEncoding.ID_SIZE; + } + byte[] encodedDataSection = + new byte[encodedResultWithHeader.length - headerLen]; + System.arraycopy(encodedResultWithHeader, headerLen, + encodedDataSection, 0, encodedDataSection.length); + final ByteBuffer encodedBuf = + ByteBuffer.wrap(encodedDataSection); + encodedSizes.add(encodedSize); + encodedBlocks.add(encodedBuf); + totalSize += hbw.getOnDiskSizeWithHeader(); + } + os.close(); + + FSDataInputStream is = fs.open(path); + HFileContext meta = new HFileContextBuilder() + .withHBaseCheckSum(false) + .withIncludesMvcc(includesMemstoreTS) + .withIncludesTags(includesTag) + .withCompression(algo) + .build(); + HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), + totalSize, fs, path, meta); + hbr.setDataBlockEncoder(dataBlockEncoder); + hbr.setIncludesMemstoreTS(includesMemstoreTS); + + HFileBlock b; + int pos = 0; + for (int blockId = 0; blockId < numBlocks; ++blockId) { + b = hbr.readBlockData(pos, -1, -1, pread); + b.sanityCheck(); + if (meta.isCompressedOrEncrypted()) { + assertFalse(b.isUnpacked()); + b = b.unpack(meta, hbr); + } + pos += b.getOnDiskSizeWithHeader(); + + assertEquals((int) encodedSizes.get(blockId), + b.getUncompressedSizeWithoutHeader()); + ByteBuff actualBuffer = b.getBufferWithoutHeader(); + if (encoding != DataBlockEncoding.NONE) { + // We expect a two-byte big-endian encoding id. + assertEquals(0, actualBuffer.get(0)); + assertEquals(encoding.getId(), actualBuffer.get(1)); + actualBuffer.position(2); + actualBuffer = actualBuffer.slice(); + } + + ByteBuffer expectedBuffer = encodedBlocks.get(blockId); + expectedBuffer.rewind(); + + // test if content matches, produce nice message + TestHFileBlock.assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, + algo, encoding, pread); + } + is.close(); + } + } + } + } + /** + * This is the version of the HFileBlock.Writer that is used to + * create V2 blocks with minor version 0. These blocks do not + * have hbase-level checksums. The code is here to test + * backward compatibility. The reason we do not inherit from + * HFileBlock.Writer is because we never ever want to change the code + * in this class but the code in HFileBlock.Writer will continually + * evolve. + */ + public static final class Writer extends HFileBlock.Writer { + + // These constants are as they were in minorVersion 0. + private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; + private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER; + private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM; + + private enum State { + INIT, + WRITING, + BLOCK_READY + }; + + /** Writer state. Used to ensure the correct usage protocol. */ + private State state = State.INIT; + + /** Compression algorithm for all blocks this instance writes. */ + private final Compression.Algorithm compressAlgo; + + /** Data block encoder used for data blocks */ + private final HFileDataBlockEncoder dataBlockEncoder; + + private HFileBlockEncodingContext dataBlockEncodingCtx; + /** block encoding context for non-data blocks */ + private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; + + /** + * The stream we use to accumulate data in uncompressed format for each + * block. We reset this stream at the end of each block and reuse it. The + * header is written as the first {@link #HEADER_SIZE} bytes into this + * stream. + */ + private ByteArrayOutputStream baosInMemory; + + /** Compressor, which is also reused between consecutive blocks. */ + private Compressor compressor; + + /** + * Current block type. Set in {@link #startWriting(BlockType)}. Could be + * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} + * to {@link BlockType#ENCODED_DATA}. + */ + private BlockType blockType; + + /** + * A stream that we write uncompressed bytes to, which compresses them and + * writes them to {@link #baosInMemory}. + */ + private DataOutputStream userDataStream; + + /** + * Bytes to be written to the file system, including the header. Compressed + * if compression is turned on. + */ + private byte[] onDiskBytesWithHeader; + + /** + * Valid in the READY state. Contains the header and the uncompressed (but + * potentially encoded, if this is a data block) bytes, so the length is + * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. + */ + private byte[] uncompressedBytesWithHeader; + + /** + * Current block's start offset in the {@link HFile}. Set in + * {@link #writeHeaderAndData(FSDataOutputStream)}. + */ + private long startOffset; + + /** + * Offset of previous block by block type. Updated when the next block is + * started. + */ + private long[] prevOffsetByType; + + /** The offset of the previous block of the same type */ + private long prevOffset; + + private int unencodedDataSizeWritten; + + public Writer(Compression.Algorithm compressionAlgorithm, + HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) { + this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false) + .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) + .withCompression(compressionAlgorithm).build()); + } + + public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) { + super(dataBlockEncoder, meta); + compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression(); + this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder + : NoOpDataBlockEncoder.INSTANCE; + defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta); + dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta); + baosInMemory = new ByteArrayOutputStream(); + + prevOffsetByType = new long[BlockType.values().length]; + for (int i = 0; i < prevOffsetByType.length; ++i) + prevOffsetByType[i] = -1; + } + + /** + * Starts writing into the block. The previous block's data is discarded. + * + * @return the stream the user can write their data into + * @throws IOException + */ + public DataOutputStream startWriting(BlockType newBlockType) + throws IOException { + if (state == State.BLOCK_READY && startOffset != -1) { + // We had a previous block that was written to a stream at a specific + // offset. Save that offset as the last offset of a block of that type. + prevOffsetByType[blockType.getId()] = startOffset; + } + + startOffset = -1; + blockType = newBlockType; + + baosInMemory.reset(); + baosInMemory.write(DUMMY_HEADER); + + state = State.WRITING; + + // We will compress it later in finishBlock() + userDataStream = new DataOutputStream(baosInMemory); + if (newBlockType == BlockType.DATA) { + this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); + } + this.unencodedDataSizeWritten = 0; + return userDataStream; + } + + @Override + public void write(Cell c) throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + expectState(State.WRITING); + this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream); + this.unencodedDataSizeWritten += kv.getLength(); + if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) { + this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getSequenceId()); + } + } + + /** + * Returns the stream for the user to write to. The block writer takes care + * of handling compression and buffering for caching on write. Can only be + * called in the "writing" state. + * + * @return the data output stream for the user to write to + */ + DataOutputStream getUserDataStream() { + expectState(State.WRITING); + return userDataStream; + } + + /** + * Transitions the block writer from the "writing" state to the "block + * ready" state. Does nothing if a block is already finished. + */ + void ensureBlockReady() throws IOException { + Preconditions.checkState(state != State.INIT, + "Unexpected state: " + state); + + if (state == State.BLOCK_READY) + return; + + // This will set state to BLOCK_READY. + finishBlock(); + } + + /** + * An internal method that flushes the compressing stream (if using + * compression), serializes the header, and takes care of the separate + * uncompressed stream for caching on write, if applicable. Sets block + * write state to "block ready". + */ + void finishBlock() throws IOException { + if (blockType == BlockType.DATA) { + this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, + baosInMemory.toByteArray(), blockType); + blockType = dataBlockEncodingCtx.getBlockType(); + } + userDataStream.flush(); + // This does an array copy, so it is safe to cache this byte array. + uncompressedBytesWithHeader = baosInMemory.toByteArray(); + prevOffset = prevOffsetByType[blockType.getId()]; + + // We need to set state before we can package the block up for + // cache-on-write. In a way, the block is ready, but not yet encoded or + // compressed. + state = State.BLOCK_READY; + if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { + onDiskBytesWithHeader = dataBlockEncodingCtx + .compressAndEncrypt(uncompressedBytesWithHeader); + } else { + onDiskBytesWithHeader = defaultBlockEncodingCtx + .compressAndEncrypt(uncompressedBytesWithHeader); + } + + // put the header for on disk bytes + putHeader(onDiskBytesWithHeader, 0, + onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); + //set the header for the uncompressed bytes (for cache-on-write) + putHeader(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); + } + + /** + * Put the header into the given byte array at the given offset. + * @param onDiskSize size of the block on disk + * @param uncompressedSize size of the block after decompression (but + * before optional data block decoding) + */ + private void putHeader(byte[] dest, int offset, int onDiskSize, + int uncompressedSize) { + offset = blockType.put(dest, offset); + offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE); + offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE); + Bytes.putLong(dest, offset, prevOffset); + } + + /** + * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records + * the offset of this block so that it can be referenced in the next block + * of the same type. + * + * @param out + * @throws IOException + */ + public void writeHeaderAndData(FSDataOutputStream out) throws IOException { + long offset = out.getPos(); + if (startOffset != -1 && offset != startOffset) { + throw new IOException("A " + blockType + " block written to a " + + "stream twice, first at offset " + startOffset + ", then at " + + offset); + } + startOffset = offset; + + writeHeaderAndData((DataOutputStream) out); + } + + /** + * Writes the header and the compressed data of this block (or uncompressed + * data when not using compression) into the given stream. Can be called in + * the "writing" state or in the "block ready" state. If called in the + * "writing" state, transitions the writer to the "block ready" state. + * + * @param out the output stream to write the + * @throws IOException + */ + private void writeHeaderAndData(DataOutputStream out) throws IOException { + ensureBlockReady(); + out.write(onDiskBytesWithHeader); + } + + /** + * Returns the header or the compressed data (or uncompressed data when not + * using compression) as a byte array. Can be called in the "writing" state + * or in the "block ready" state. If called in the "writing" state, + * transitions the writer to the "block ready" state. + * + * @return header and data as they would be stored on disk in a byte array + * @throws IOException + */ + public byte[] getHeaderAndData() throws IOException { + ensureBlockReady(); + return onDiskBytesWithHeader; + } + + /** + * Releases the compressor this writer uses to compress blocks into the + * compressor pool. Needs to be called before the writer is discarded. + */ + public void releaseCompressor() { + if (compressor != null) { + compressAlgo.returnCompressor(compressor); + compressor = null; + } + } + + /** + * Returns the on-disk size of the data portion of the block. This is the + * compressed size if compression is enabled. Can only be called in the + * "block ready" state. Header is not compressed, and its size is not + * included in the return value. + * + * @return the on-disk size of the block, not including the header. + */ + public int getOnDiskSizeWithoutHeader() { + expectState(State.BLOCK_READY); + return onDiskBytesWithHeader.length - HEADER_SIZE; + } + + /** + * Returns the on-disk size of the block. Can only be called in the + * "block ready" state. + * + * @return the on-disk size of the block ready to be written, including the + * header size + */ + public int getOnDiskSizeWithHeader() { + expectState(State.BLOCK_READY); + return onDiskBytesWithHeader.length; + } + + /** + * The uncompressed size of the block data. Does not include header size. + */ + public int getUncompressedSizeWithoutHeader() { + expectState(State.BLOCK_READY); + return uncompressedBytesWithHeader.length - HEADER_SIZE; + } + + /** + * The uncompressed size of the block data, including header size. + */ + public int getUncompressedSizeWithHeader() { + expectState(State.BLOCK_READY); + return uncompressedBytesWithHeader.length; + } + + /** @return true if a block is being written */ + public boolean isWriting() { + return state == State.WRITING; + } + + /** + * Returns the number of bytes written into the current block so far, or + * zero if not writing the block at the moment. Note that this will return + * zero in the "block ready" state as well. + * + * @return the number of bytes written + */ + public int blockSizeWritten() { + if (state != State.WRITING) + return 0; + return this.unencodedDataSizeWritten; + } + + /** + * Returns the header followed by the uncompressed data, even if using + * compression. This is needed for storing uncompressed blocks in the block + * cache. Can be called in the "writing" state or the "block ready" state. + * + * @return uncompressed block bytes for caching on write + */ + private byte[] getUncompressedDataWithHeader() { + expectState(State.BLOCK_READY); + + return uncompressedBytesWithHeader; + } + + private void expectState(State expectedState) { + if (state != expectedState) { + throw new IllegalStateException("Expected state: " + expectedState + + ", actual state: " + state); + } + } + + /** + * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte + * buffer. + * + * @return uncompressed block for caching on write in the form of a buffer + */ + public ByteBuffer getUncompressedBufferWithHeader() { + byte[] b = getUncompressedDataWithHeader(); + return ByteBuffer.wrap(b, 0, b.length); + } + + /** + * Takes the given {@link BlockWritable} instance, creates a new block of + * its appropriate type, writes the writable into this block, and flushes + * the block into the output stream. The writer is instructed not to buffer + * uncompressed bytes for cache-on-write. + * + * @param bw the block-writable object to write as a block + * @param out the file system output stream + * @throws IOException + */ + public void writeBlock(BlockWritable bw, FSDataOutputStream out) + throws IOException { + bw.writeToBlock(startWriting(bw.getBlockType())); + writeHeaderAndData(out); + } + + /** + * Creates a new HFileBlock. + */ + public HFileBlock getBlockForCaching() { + HFileContext meta = new HFileContextBuilder() + .withHBaseCheckSum(false) + .withChecksumType(ChecksumType.NULL) + .withBytesPerCheckSum(0) + .build(); + return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), + getUncompressedSizeWithoutHeader(), prevOffset, + getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, + getOnDiskSizeWithoutHeader(), meta); + } + } + +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 470d483..687d3cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -185,7 +185,8 @@ public class TestHFileBlockIndex { } missCount += 1; - prevBlock = realReader.readBlockData(offset, onDiskSize, pread); + prevBlock = realReader.readBlockData(offset, onDiskSize, + -1, pread); prevOffset = offset; prevOnDiskSize = onDiskSize; prevPread = pread; http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 387514e..6f434bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -92,7 +92,8 @@ public class TestHFileDataBlockEncoder { if (blockEncoder.getDataBlockEncoding() == DataBlockEncoding.NONE) { - assertEquals(block.getBufferReadOnly(), returnedBlock.getBufferReadOnly()); + assertEquals(block.getBufferWithHeader(), + returnedBlock.getBufferWithHeader()); } else { if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) { System.out.println(blockEncoder); @@ -126,7 +127,7 @@ public class TestHFileDataBlockEncoder { .build(); HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER, 0, - 0, -1, hfileContext); + 0, hfileContext); HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags); assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length); } @@ -197,7 +198,7 @@ public class TestHFileDataBlockEncoder { .build(); HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER, 0, - 0, -1, meta); + 0, meta); return b; } @@ -219,8 +220,7 @@ public class TestHFileDataBlockEncoder { byte[] encodedBytes = baos.toByteArray(); size = encodedBytes.length - block.getDummyHeaderForVersion().length; return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes), - HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1, - block.getHFileContext()); + HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext()); } private void writeBlock(List kvs, HFileContext fileContext, boolean useTags) http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index 3264558..ba3a344 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -99,7 +99,7 @@ public class TestHFileEncryption { private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size) throws IOException { - HFileBlock b = hbr.readBlockData(pos, -1, false); + HFileBlock b = hbr.readBlockData(pos, -1, -1, false); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); assertFalse(b.isUnpacked()); http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index 983ec2f..c7eb11b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -218,7 +218,7 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuff buf = block.getBufferWithoutHeader(); @@ -279,14 +279,13 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuff buf = block.getBufferWithoutHeader(); if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { - throw new IOException("Failed to deserialize block " + this + - " into a " + t.getClass().getSimpleName()); + throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); } Text expectedText = (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text( http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index d20ba2b..69a77bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -78,8 +78,14 @@ public class TestPrefetch { // Check that all of the data blocks were preloaded BlockCache blockCache = cacheConf.getBlockCache(); long offset = 0; + HFileBlock prevBlock = null; while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { - HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } + HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null, + null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; if (block.getBlockType() == BlockType.DATA || @@ -87,6 +93,7 @@ public class TestPrefetch { block.getBlockType() == BlockType.INTERMEDIATE_INDEX) { assertTrue(isCached); } + prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 2357bef..0916fe6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -227,10 +227,15 @@ public class TestCacheOnWriteInSchema { assertTrue(testDescription, scanner.seekTo()); // Cribbed from io.hfile.TestCacheOnWrite long offset = 0; + HFileBlock prevBlock = null; while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. - HFileBlock block = reader.readBlock(offset, -1, false, true, + HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null, DataBlockEncoding.NONE); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); @@ -244,6 +249,7 @@ public class TestCacheOnWriteInSchema { "block: " + block + "\n" + "blockCacheKey: " + blockCacheKey); } + prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } } finally {