Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 875362004F3 for ; Tue, 15 Aug 2017 17:06:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 85EFC166BE6; Tue, 15 Aug 2017 15:06:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D6532166BE9 for ; Tue, 15 Aug 2017 17:06:29 +0200 (CEST) Received: (qmail 10525 invoked by uid 500); 15 Aug 2017 15:06:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 8253 invoked by uid 99); 15 Aug 2017 15:06:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Aug 2017 15:06:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9E84F5EDD; Tue, 15 Aug 2017 15:06:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 15 Aug 2017 15:06:26 -0000 Message-Id: <6022adc91b6d4459b8a15f419818b88d@git.apache.org> In-Reply-To: <1727cd37fde541ada4a5e500bb67f85f@git.apache.org> References: <1727cd37fde541ada4a5e500bb67f85f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/51] [partial] hbase-site git commit: Published site at . archived-at: Tue, 15 Aug 2017 15:06:31 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/07e68d46/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.CacheOnWriteType.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.CacheOnWriteType.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.CacheOnWriteType.html index 89a03a3..a5070e4 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.CacheOnWriteType.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.CacheOnWriteType.html @@ -49,435 +49,436 @@ 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.ArrayBackedTag; 043import org.apache.hadoop.hbase.CellComparator; -044import org.apache.hadoop.hbase.HBaseTestingUtility; -045import org.apache.hadoop.hbase.HColumnDescriptor; -046import org.apache.hadoop.hbase.HConstants; -047import org.apache.hadoop.hbase.KeyValue; -048import org.apache.hadoop.hbase.Tag; -049import org.apache.hadoop.hbase.client.Durability; -050import org.apache.hadoop.hbase.client.Put; -051import org.apache.hadoop.hbase.fs.HFileSystem; -052import org.apache.hadoop.hbase.io.compress.Compression; -053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -054import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -055import org.apache.hadoop.hbase.regionserver.BloomType; -056import org.apache.hadoop.hbase.regionserver.HRegion; -057import org.apache.hadoop.hbase.regionserver.Region; -058import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -059import org.apache.hadoop.hbase.testclassification.IOTests; -060import org.apache.hadoop.hbase.testclassification.MediumTests; -061import org.apache.hadoop.hbase.util.BloomFilterFactory; -062import org.apache.hadoop.hbase.util.Bytes; -063import org.apache.hadoop.hbase.util.ChecksumType; -064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -065import org.junit.After; -066import org.junit.AfterClass; -067import org.junit.Before; -068import org.junit.Test; -069import org.junit.experimental.categories.Category; -070import org.junit.runner.RunWith; -071import org.junit.runners.Parameterized; -072import org.junit.runners.Parameterized.Parameters; -073 -074import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -075 -076/** -077 * Tests {@link HFile} cache-on-write functionality for the following block -078 * types: data blocks, non-root index blocks, and Bloom filter blocks. -079 */ -080@RunWith(Parameterized.class) -081@Category({IOTests.class, MediumTests.class}) -082public class TestCacheOnWrite { -083 -084 private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class); -085 -086 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); -087 private Configuration conf; -088 private CacheConfig cacheConf; -089 private FileSystem fs; -090 private Random rand = new Random(12983177L); -091 private Path storeFilePath; -092 private BlockCache blockCache; -093 private String testDescription; -094 -095 private final CacheOnWriteType cowType; -096 private final Compression.Algorithm compress; -097 private final boolean cacheCompressedData; -098 -099 private static final int DATA_BLOCK_SIZE = 2048; -100 private static final int NUM_KV = 25000; -101 private static final int INDEX_BLOCK_SIZE = 512; -102 private static final int BLOOM_BLOCK_SIZE = 4096; -103 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; -104 private static final int CKBYTES = 512; -105 -106 /** The number of valid key types possible in a store file */ -107 private static final int NUM_VALID_KEY_TYPES = -108 KeyValue.Type.values().length - 2; -109 -110 private static enum CacheOnWriteType { -111 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, -112 BlockType.DATA, BlockType.ENCODED_DATA), -113 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, -114 BlockType.BLOOM_CHUNK), -115 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, -116 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX); -117 -118 private final String confKey; -119 private final BlockType blockType1; -120 private final BlockType blockType2; -121 -122 private CacheOnWriteType(String confKey, BlockType blockType) { -123 this(confKey, blockType, blockType); -124 } -125 -126 private CacheOnWriteType(String confKey, BlockType blockType1, -127 BlockType blockType2) { -128 this.blockType1 = blockType1; -129 this.blockType2 = blockType2; -130 this.confKey = confKey; -131 } -132 -133 public boolean shouldBeCached(BlockType blockType) { -134 return blockType == blockType1 || blockType == blockType2; -135 } -136 -137 public void modifyConf(Configuration conf) { -138 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { -139 conf.setBoolean(cowType.confKey, cowType == this); -140 } -141 } -142 } -143 -144 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, -145 boolean cacheCompressedData, BlockCache blockCache) { -146 this.cowType = cowType; -147 this.compress = compress; -148 this.cacheCompressedData = cacheCompressedData; -149 this.blockCache = blockCache; -150 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + -151 ", cacheCompressedData=" + cacheCompressedData + "]"; -152 LOG.info(testDescription); -153 } -154 -155 private static List<BlockCache> getBlockCaches() throws IOException { -156 Configuration conf = TEST_UTIL.getConfiguration(); -157 List<BlockCache> blockcaches = new ArrayList<>(); -158 // default -159 blockcaches.add(new CacheConfig(conf).getBlockCache()); -160 -161 //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 -162 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f); -163 // memory -164 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); -165 blockcaches.add(lru); -166 -167 // bucket cache -168 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); -169 int[] bucketSizes = -170 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; -171 BlockCache bucketcache = -172 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); -173 blockcaches.add(bucketcache); -174 return blockcaches; -175 } -176 -177 @Parameters -178 public static Collection<Object[]> getParameters() throws IOException { -179 List<Object[]> params = new ArrayList<>(); -180 for (BlockCache blockCache : getBlockCaches()) { -181 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { -182 for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) { -183 for (boolean cacheCompressedData : new boolean[] { false, true }) { -184 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); -185 } -186 } -187 } -188 } -189 return params; -190 } -191 -192 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { -193 if (blockCache instanceof LruBlockCache) { -194 ((LruBlockCache) blockCache).clearCache(); -195 } else { -196 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. -197 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { -198 if (clearCount > 0) { -199 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " -200 + blockCache.getBlockCount() + " blocks remaining"); -201 Thread.sleep(10); -202 } -203 for (CachedBlock block : Lists.newArrayList(blockCache)) { -204 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); -205 // CombinedBucketCache may need evict two times. -206 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { -207 if (evictCount > 1) { -208 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount -209 + " times, maybe a bug here"); -210 } -211 } -212 } -213 } -214 } -215 } -216 -217 @Before -218 public void setUp() throws IOException { -219 conf = TEST_UTIL.getConfiguration(); -220 this.conf.set("dfs.datanode.data.dir.perm", "700"); -221 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); -222 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, -223 BLOOM_BLOCK_SIZE); -224 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); -225 cowType.modifyConf(conf); -226 fs = HFileSystem.get(conf); -227 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache; -228 cacheConf = -229 new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), -230 cowType.shouldBeCached(BlockType.LEAF_INDEX), -231 cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, -232 false, false, false); -233 } -234 -235 @After -236 public void tearDown() throws IOException, InterruptedException { -237 clearBlockCache(blockCache); -238 } -239 -240 @AfterClass -241 public static void afterClass() throws IOException { -242 TEST_UTIL.cleanupTestDir(); -243 } -244 -245 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { -246 writeStoreFile(useTags); -247 readStoreFile(useTags); -248 } -249 -250 private void readStoreFile(boolean useTags) throws IOException { -251 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); -252 LOG.info("HFile information: " + reader); -253 HFileContext meta = new HFileContextBuilder().withCompression(compress) -254 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) -255 .withBlockSize(DATA_BLOCK_SIZE) -256 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) -257 .withIncludesTags(useTags).build(); -258 final boolean cacheBlocks = false; -259 final boolean pread = false; -260 HFileScanner scanner = reader.getScanner(cacheBlocks, pread); -261 assertTrue(testDescription, scanner.seekTo()); -262 -263 long offset = 0; -264 EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class); -265 -266 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); -267 List<Long> cachedBlocksOffset = new ArrayList<>(); -268 Map<Long, HFileBlock> cachedBlocks = new HashMap<>(); -269 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { -270 // Flags: don't cache the block, use pread, this is not a compaction. -271 // Also, pass null for expected block type to avoid checking it. -272 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, -273 encodingInCache); -274 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), -275 offset); -276 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); -277 boolean isCached = fromCache != null; -278 cachedBlocksOffset.add(offset); -279 cachedBlocks.put(offset, fromCache); -280 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); -281 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + -282 "isCached: " + isCached + "\n" + -283 "Test description: " + testDescription + "\n" + -284 "block: " + block + "\n" + -285 "encodingInCache: " + encodingInCache + "\n" + -286 "blockCacheKey: " + blockCacheKey, -287 shouldBeCached == isCached); -288 if (isCached) { -289 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { -290 if (compress != Compression.Algorithm.NONE) { -291 assertFalse(fromCache.isUnpacked()); -292 } -293 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); -294 } else { -295 assertTrue(fromCache.isUnpacked()); -296 } -297 // block we cached at write-time and block read from file should be identical -298 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); -299 assertEquals(block.getBlockType(), fromCache.getBlockType()); -300 assertNotEquals(block.getBlockType(), BlockType.ENCODED_DATA); -301 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); -302 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); -303 assertEquals( -304 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); -305 } -306 offset += block.getOnDiskSizeWithHeader(); -307 BlockType bt = block.getBlockType(); -308 Integer count = blockCountByType.get(bt); -309 blockCountByType.put(bt, (count == null ? 0 : count) + 1); -310 } -311 -312 LOG.info("Block count by type: " + blockCountByType); -313 String countByType = blockCountByType.toString(); -314 if (useTags) { -315 assertEquals("{" + BlockType.DATA -316 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType); -317 } else { -318 assertEquals("{" + BlockType.DATA -319 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType); -320 } -321 -322 // iterate all the keyvalue from hfile -323 while (scanner.next()) { -324 scanner.getCell(); -325 } -326 Iterator<Long> iterator = cachedBlocksOffset.iterator(); -327 while(iterator.hasNext()) { -328 Long entry = iterator.next(); -329 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), -330 entry); -331 HFileBlock hFileBlock = cachedBlocks.get(entry); -332 if (hFileBlock != null) { -333 // call return twice because for the isCache cased the counter would have got incremented -334 // twice -335 blockCache.returnBlock(blockCacheKey, hFileBlock); -336 if(cacheCompressedData) { -337 if (this.compress == Compression.Algorithm.NONE -338 || cowType == CacheOnWriteType.INDEX_BLOCKS -339 || cowType == CacheOnWriteType.BLOOM_BLOCKS) { -340 blockCache.returnBlock(blockCacheKey, hFileBlock); -341 } -342 } else { -343 blockCache.returnBlock(blockCacheKey, hFileBlock); -344 } -345 } -346 } -347 scanner.shipped(); -348 reader.close(); -349 } -350 -351 public static KeyValue.Type generateKeyType(Random rand) { -352 if (rand.nextBoolean()) { -353 // Let's make half of KVs puts. -354 return KeyValue.Type.Put; -355 } else { -356 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; -357 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { -358 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " -359 + "Probably the layout of KeyValue.Type has changed."); -360 } -361 return keyType; -362 } -363 } -364 -365 private void writeStoreFile(boolean useTags) throws IOException { -366 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), -367 "test_cache_on_write"); -368 HFileContext meta = new HFileContextBuilder().withCompression(compress) -369 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) -370 .withBlockSize(DATA_BLOCK_SIZE) -371 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) -372 .withIncludesTags(useTags).build(); -373 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) -374 .withOutputDir(storeFileParentDir).withComparator(CellComparator.COMPARATOR) -375 .withFileContext(meta) -376 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); -377 byte[] cf = Bytes.toBytes("fam"); -378 for (int i = 0; i < NUM_KV; ++i) { -379 byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i); -380 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); -381 byte[] value = RandomKeyValueUtil.randomValue(rand); -382 KeyValue kv; -383 if(useTags) { -384 Tag t = new ArrayBackedTag((byte) 1, "visibility"); -385 List<Tag> tagList = new ArrayList<>(); -386 tagList.add(t); -387 Tag[] tags = new Tag[1]; -388 tags[0] = t; -389 kv = -390 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, -391 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList); -392 } else { -393 kv = -394 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, -395 rand.nextLong(), generateKeyType(rand), value, 0, value.length); -396 } -397 sfw.append(kv); -398 } -399 -400 sfw.close(); -401 storeFilePath = sfw.getPath(); -402 } -403 -404 private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) -405 throws IOException, InterruptedException { -406 // TODO: need to change this test if we add a cache size threshold for -407 // compactions, or if we implement some other kind of intelligent logic for -408 // deciding what blocks to cache-on-write on compaction. -409 final String table = "CompactionCacheOnWrite"; -410 final String cf = "myCF"; -411 final byte[] cfBytes = Bytes.toBytes(cf); -412 final int maxVersions = 3; -413 Region region = TEST_UTIL.createTestRegion(table, -414 new HColumnDescriptor(cf) -415 .setCompressionType(compress) -416 .setBloomFilterType(BLOOM_TYPE) -417 .setMaxVersions(maxVersions) -418 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) -419 ); -420 int rowIdx = 0; -421 long ts = EnvironmentEdgeManager.currentTime(); -422 for (int iFile = 0; iFile < 5; ++iFile) { -423 for (int iRow = 0; iRow < 500; ++iRow) { -424 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + -425 iRow; -426 Put p = new Put(Bytes.toBytes(rowStr)); -427 ++rowIdx; -428 for (int iCol = 0; iCol < 10; ++iCol) { -429 String qualStr = "col" + iCol; -430 String valueStr = "value_" + rowStr + "_" + qualStr; -431 for (int iTS = 0; iTS < 5; ++iTS) { -432 if (useTags) { -433 Tag t = new ArrayBackedTag((byte) 1, "visibility"); -434 Tag[] tags = new Tag[1]; -435 tags[0] = t; -436 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), -437 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); -438 p.add(kv); -439 } else { -440 p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); -441 } -442 } -443 } -444 p.setDurability(Durability.ASYNC_WAL); -445 region.put(p); -446 } -447 region.flush(true); -448 } -449 clearBlockCache(blockCache); -450 assertEquals(0, blockCache.getBlockCount()); -451 region.compact(false); -452 LOG.debug("compactStores() returned"); -453 -454 for (CachedBlock block: blockCache) { -455 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); -456 assertNotEquals(BlockType.DATA, block.getBlockType()); -457 } -458 ((HRegion)region).close(); -459 } -460 -461 @Test -462 public void testStoreFileCacheOnWrite() throws IOException { -463 testStoreFileCacheOnWriteInternals(false); -464 testStoreFileCacheOnWriteInternals(true); -465 } -466 -467 @Test -468 public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { -469 testNotCachingDataBlocksDuringCompactionInternals(false); -470 testNotCachingDataBlocksDuringCompactionInternals(true); -471 } -472} +044import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +045import org.apache.hadoop.hbase.HBaseTestingUtility; +046import org.apache.hadoop.hbase.HColumnDescriptor; +047import org.apache.hadoop.hbase.HConstants; +048import org.apache.hadoop.hbase.KeyValue; +049import org.apache.hadoop.hbase.Tag; +050import org.apache.hadoop.hbase.client.Durability; +051import org.apache.hadoop.hbase.client.Put; +052import org.apache.hadoop.hbase.fs.HFileSystem; +053import org.apache.hadoop.hbase.io.compress.Compression; +054import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +056import org.apache.hadoop.hbase.regionserver.BloomType; +057import org.apache.hadoop.hbase.regionserver.HRegion; +058import org.apache.hadoop.hbase.regionserver.Region; +059import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +060import org.apache.hadoop.hbase.testclassification.IOTests; +061import org.apache.hadoop.hbase.testclassification.MediumTests; +062import org.apache.hadoop.hbase.util.BloomFilterFactory; +063import org.apache.hadoop.hbase.util.Bytes; +064import org.apache.hadoop.hbase.util.ChecksumType; +065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +066import org.junit.After; +067import org.junit.AfterClass; +068import org.junit.Before; +069import org.junit.Test; +070import org.junit.experimental.categories.Category; +071import org.junit.runner.RunWith; +072import org.junit.runners.Parameterized; +073import org.junit.runners.Parameterized.Parameters; +074 +075import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +076 +077/** +078 * Tests {@link HFile} cache-on-write functionality for the following block +079 * types: data blocks, non-root index blocks, and Bloom filter blocks. +080 */ +081@RunWith(Parameterized.class) +082@Category({IOTests.class, MediumTests.class}) +083public class TestCacheOnWrite { +084 +085 private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class); +086 +087 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); +088 private Configuration conf; +089 private CacheConfig cacheConf; +090 private FileSystem fs; +091 private Random rand = new Random(12983177L); +092 private Path storeFilePath; +093 private BlockCache blockCache; +094 private String testDescription; +095 +096 private final CacheOnWriteType cowType; +097 private final Compression.Algorithm compress; +098 private final boolean cacheCompressedData; +099 +100 private static final int DATA_BLOCK_SIZE = 2048; +101 private static final int NUM_KV = 25000; +102 private static final int INDEX_BLOCK_SIZE = 512; +103 private static final int BLOOM_BLOCK_SIZE = 4096; +104 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; +105 private static final int CKBYTES = 512; +106 +107 /** The number of valid key types possible in a store file */ +108 private static final int NUM_VALID_KEY_TYPES = +109 KeyValue.Type.values().length - 2; +110 +111 private static enum CacheOnWriteType { +112 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, +113 BlockType.DATA, BlockType.ENCODED_DATA), +114 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, +115 BlockType.BLOOM_CHUNK), +116 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, +117 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX); +118 +119 private final String confKey; +120 private final BlockType blockType1; +121 private final BlockType blockType2; +122 +123 private CacheOnWriteType(String confKey, BlockType blockType) { +124 this(confKey, blockType, blockType); +125 } +126 +127 private CacheOnWriteType(String confKey, BlockType blockType1, +128 BlockType blockType2) { +129 this.blockType1 = blockType1; +130 this.blockType2 = blockType2; +131 this.confKey = confKey; +132 } +133 +134 public boolean shouldBeCached(BlockType blockType) { +135 return blockType == blockType1 || blockType == blockType2; +136 } +137 +138 public void modifyConf(Configuration conf) { +139 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { +140 conf.setBoolean(cowType.confKey, cowType == this); +141 } +142 } +143 } +144 +145 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, +146 boolean cacheCompressedData, BlockCache blockCache) { +147 this.cowType = cowType; +148 this.compress = compress; +149 this.cacheCompressedData = cacheCompressedData; +150 this.blockCache = blockCache; +151 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + +152 ", cacheCompressedData=" + cacheCompressedData + "]"; +153 LOG.info(testDescription); +154 } +155 +156 private static List<BlockCache> getBlockCaches() throws IOException { +157 Configuration conf = TEST_UTIL.getConfiguration(); +158 List<BlockCache> blockcaches = new ArrayList<>(); +159 // default +160 blockcaches.add(new CacheConfig(conf).getBlockCache()); +161 +162 //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 +163 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f); +164 // memory +165 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); +166 blockcaches.add(lru); +167 +168 // bucket cache +169 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); +170 int[] bucketSizes = +171 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; +172 BlockCache bucketcache = +173 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); +174 blockcaches.add(bucketcache); +175 return blockcaches; +176 } +177 +178 @Parameters +179 public static Collection<Object[]> getParameters() throws IOException { +180 List<Object[]> params = new ArrayList<>(); +181 for (BlockCache blockCache : getBlockCaches()) { +182 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { +183 for (Compression.Algorithm compress : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) { +184 for (boolean cacheCompressedData : new boolean[] { false, true }) { +185 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); +186 } +187 } +188 } +189 } +190 return params; +191 } +192 +193 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { +194 if (blockCache instanceof LruBlockCache) { +195 ((LruBlockCache) blockCache).clearCache(); +196 } else { +197 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. +198 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { +199 if (clearCount > 0) { +200 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " +201 + blockCache.getBlockCount() + " blocks remaining"); +202 Thread.sleep(10); +203 } +204 for (CachedBlock block : Lists.newArrayList(blockCache)) { +205 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); +206 // CombinedBucketCache may need evict two times. +207 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { +208 if (evictCount > 1) { +209 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount +210 + " times, maybe a bug here"); +211 } +212 } +213 } +214 } +215 } +216 } +217 +218 @Before +219 public void setUp() throws IOException { +220 conf = TEST_UTIL.getConfiguration(); +221 this.conf.set("dfs.datanode.data.dir.perm", "700"); +222 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); +223 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, +224 BLOOM_BLOCK_SIZE); +225 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); +226 cowType.modifyConf(conf); +227 fs = HFileSystem.get(conf); +228 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache; +229 cacheConf = +230 new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), +231 cowType.shouldBeCached(BlockType.LEAF_INDEX), +232 cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, +233 false, false, false); +234 } +235 +236 @After +237 public void tearDown() throws IOException, InterruptedException { +238 clearBlockCache(blockCache); +239 } +240 +241 @AfterClass +242 public static void afterClass() throws IOException { +243 TEST_UTIL.cleanupTestDir(); +244 } +245 +246 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { +247 writeStoreFile(useTags); +248 readStoreFile(useTags); +249 } +250 +251 private void readStoreFile(boolean useTags) throws IOException { +252 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); +253 LOG.info("HFile information: " + reader); +254 HFileContext meta = new HFileContextBuilder().withCompression(compress) +255 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) +256 .withBlockSize(DATA_BLOCK_SIZE) +257 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) +258 .withIncludesTags(useTags).build(); +259 final boolean cacheBlocks = false; +260 final boolean pread = false; +261 HFileScanner scanner = reader.getScanner(cacheBlocks, pread); +262 assertTrue(testDescription, scanner.seekTo()); +263 +264 long offset = 0; +265 EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class); +266 +267 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); +268 List<Long> cachedBlocksOffset = new ArrayList<>(); +269 Map<Long, HFileBlock> cachedBlocks = new HashMap<>(); +270 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { +271 // Flags: don't cache the block, use pread, this is not a compaction. +272 // Also, pass null for expected block type to avoid checking it. +273 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, +274 encodingInCache); +275 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), +276 offset); +277 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); +278 boolean isCached = fromCache != null; +279 cachedBlocksOffset.add(offset); +280 cachedBlocks.put(offset, fromCache); +281 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); +282 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + +283 "isCached: " + isCached + "\n" + +284 "Test description: " + testDescription + "\n" + +285 "block: " + block + "\n" + +286 "encodingInCache: " + encodingInCache + "\n" + +287 "blockCacheKey: " + blockCacheKey, +288 shouldBeCached == isCached); +289 if (isCached) { +290 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { +291 if (compress != Compression.Algorithm.NONE) { +292 assertFalse(fromCache.isUnpacked()); +293 } +294 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); +295 } else { +296 assertTrue(fromCache.isUnpacked()); +297 } +298 // block we cached at write-time and block read from file should be identical +299 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); +300 assertEquals(block.getBlockType(), fromCache.getBlockType()); +301 assertNotEquals(block.getBlockType(), BlockType.ENCODED_DATA); +302 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); +303 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); +304 assertEquals( +305 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); +306 } +307 offset += block.getOnDiskSizeWithHeader(); +308 BlockType bt = block.getBlockType(); +309 Integer count = blockCountByType.get(bt); +310 blockCountByType.put(bt, (count == null ? 0 : count) + 1); +311 } +312 +313 LOG.info("Block count by type: " + blockCountByType); +314 String countByType = blockCountByType.toString(); +315 if (useTags) { +316 assertEquals("{" + BlockType.DATA +317 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType); +318 } else { +319 assertEquals("{" + BlockType.DATA +320 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType); +321 } +322 +323 // iterate all the keyvalue from hfile +324 while (scanner.next()) { +325 scanner.getCell(); +326 } +327 Iterator<Long> iterator = cachedBlocksOffset.iterator(); +328 while(iterator.hasNext()) { +329 Long entry = iterator.next(); +330 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), +331 entry); +332 HFileBlock hFileBlock = cachedBlocks.get(entry); +333 if (hFileBlock != null) { +334 // call return twice because for the isCache cased the counter would have got incremented +335 // twice +336 blockCache.returnBlock(blockCacheKey, hFileBlock); +337 if(cacheCompressedData) { +338 if (this.compress == Compression.Algorithm.NONE +339 || cowType == CacheOnWriteType.INDEX_BLOCKS +340 || cowType == CacheOnWriteType.BLOOM_BLOCKS) { +341 blockCache.returnBlock(blockCacheKey, hFileBlock); +342 } +343 } else { +344 blockCache.returnBlock(blockCacheKey, hFileBlock); +345 } +346 } +347 } +348 scanner.shipped(); +349 reader.close(); +350 } +351 +352 public static KeyValue.Type generateKeyType(Random rand) { +353 if (rand.nextBoolean()) { +354 // Let's make half of KVs puts. +355 return KeyValue.Type.Put; +356 } else { +357 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; +358 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { +359 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " +360 + "Probably the layout of KeyValue.Type has changed."); +361 } +362 return keyType; +363 } +364 } +365 +366 private void writeStoreFile(boolean useTags) throws IOException { +367 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), +368 "test_cache_on_write"); +369 HFileContext meta = new HFileContextBuilder().withCompression(compress) +370 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) +371 .withBlockSize(DATA_BLOCK_SIZE) +372 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) +373 .withIncludesTags(useTags).build(); +374 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) +375 .withOutputDir(storeFileParentDir).withComparator(CellComparator.COMPARATOR) +376 .withFileContext(meta) +377 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); +378 byte[] cf = Bytes.toBytes("fam"); +379 for (int i = 0; i < NUM_KV; ++i) { +380 byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i); +381 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); +382 byte[] value = RandomKeyValueUtil.randomValue(rand); +383 KeyValue kv; +384 if(useTags) { +385 Tag t = new ArrayBackedTag((byte) 1, "visibility"); +386 List<Tag> tagList = new ArrayList<>(); +387 tagList.add(t); +388 Tag[] tags = new Tag[1]; +389 tags[0] = t; +390 kv = +391 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, +392 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList); +393 } else { +394 kv = +395 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, +396 rand.nextLong(), generateKeyType(rand), value, 0, value.length); +397 } +398 sfw.append(kv); +399 } +400 +401 sfw.close(); +402 storeFilePath = sfw.getPath(); +403 } +404 +405 private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) +406 throws IOException, InterruptedException { +407 // TODO: need to change this test if we add a cache size threshold for +408 // compactions, or if we implement some other kind of intelligent logic for +409 // deciding what blocks to cache-on-write on compaction. +410 final String table = "CompactionCacheOnWrite"; +411 final String cf = "myCF"; +412 final byte[] cfBytes = Bytes.toBytes(cf); +413 final int maxVersions = 3; +414 Region region = TEST_UTIL.createTestRegion(table, +415 new HColumnDescriptor(cf) +416 .setCompressionType(compress) +417 .setBloomFilterType(BLOOM_TYPE) +418 .setMaxVersions(maxVersions) +419 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) +420 ); +421 int rowIdx = 0; +422 long ts = EnvironmentEdgeManager.currentTime(); +423 for (int iFile = 0; iFile < 5; ++iFile) { +424 for (int iRow = 0; iRow < 500; ++iRow) { +425 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + +426 iRow; +427 Put p = new Put(Bytes.toBytes(rowStr)); +428 ++rowIdx; +429 for (int iCol = 0; iCol < 10; ++iCol) { +430 String qualStr = "col" + iCol; +431 String valueStr = "value_" + rowStr + "_" + qualStr; +432 for (int iTS = 0; iTS < 5; ++iTS) { +433 if (useTags) { +434 Tag t = new ArrayBackedTag((byte) 1, "visibility"); +435 Tag[] tags = new Tag[1]; +436 tags[0] = t; +437 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), +438 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); +439 p.add(kv); +440 } else { +441 p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); +442 } +443 } +444 } +445 p.setDurability(Durability.ASYNC_WAL); +446 region.put(p); +447 } +448 region.flush(true); +449 } +450 clearBlockCache(blockCache); +451 assertEquals(0, blockCache.getBlockCount()); +452 region.compact(false); +453 LOG.debug("compactStores() returned"); +454 +455 for (CachedBlock block: blockCache) { +456 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); +457 assertNotEquals(BlockType.DATA, block.getBlockType()); +458 } +459 ((HRegion)region).close(); +460 } +461 +462 @Test +463 public void testStoreFileCacheOnWrite() throws IOException { +464 testStoreFileCacheOnWriteInternals(false); +465 testStoreFileCacheOnWriteInternals(true); +466 } +467 +468 @Test +469 public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { +470 testNotCachingDataBlocksDuringCompactionInternals(false); +471 testNotCachingDataBlocksDuringCompactionInternals(true); +472 } +473}