Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31DC42004F3 for ; Tue, 15 Aug 2017 17:06:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 303A5166B64; Tue, 15 Aug 2017 15:06:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A23AD166B11 for ; Tue, 15 Aug 2017 17:06:24 +0200 (CEST) Received: (qmail 9307 invoked by uid 500); 15 Aug 2017 15:06:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 7456 invoked by uid 99); 15 Aug 2017 15:06:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Aug 2017 15:06:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5F1BF5ED3; Tue, 15 Aug 2017 15:06:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 15 Aug 2017 15:06:19 -0000 Message-Id: <060add70efa44ee086a2001588b11360@git.apache.org> In-Reply-To: <1727cd37fde541ada4a5e500bb67f85f@git.apache.org> References: <1727cd37fde541ada4a5e500bb67f85f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/51] [partial] hbase-site git commit: Published site at . archived-at: Tue, 15 Aug 2017 15:06:26 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/07e68d46/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.html index 9e93548..f770d03 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.html @@ -27,264 +27,376 @@ 019package org.apache.hadoop.hbase.io.hfile.bucket; 020 021import static org.junit.Assert.assertEquals; -022import static org.junit.Assert.assertTrue; -023 -024import java.io.FileNotFoundException; -025import java.io.IOException; -026import java.util.ArrayList; -027import java.util.Arrays; -028import java.util.List; -029import java.util.Random; -030import java.util.concurrent.locks.ReentrantReadWriteLock; -031 -032import org.apache.hadoop.fs.Path; -033import org.apache.hadoop.hbase.HBaseTestingUtility; -034import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -035import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; -036import org.apache.hadoop.hbase.io.hfile.Cacheable; -037import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; -038import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; -039import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; -040import org.apache.hadoop.hbase.testclassification.IOTests; -041import org.apache.hadoop.hbase.testclassification.SmallTests; -042import org.junit.After; -043import org.junit.Before; -044import org.junit.Test; -045import org.junit.experimental.categories.Category; -046import org.junit.runner.RunWith; -047import org.junit.runners.Parameterized; -048 -049/** -050 * Basic test of BucketCache.Puts and gets. -051 * <p> -052 * Tests will ensure that blocks' data correctness under several threads concurrency -053 */ -054@RunWith(Parameterized.class) -055@Category({ IOTests.class, SmallTests.class }) -056public class TestBucketCache { -057 -058 private static final Random RAND = new Random(); -059 -060 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") -061 public static Iterable<Object[]> data() { -062 return Arrays.asList(new Object[][] { -063 { 8192, null }, // TODO: why is 8k the default blocksize for these tests? -064 { -065 16 * 1024, -066 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, -067 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, -068 128 * 1024 + 1024 } } }); -069 } -070 -071 @Parameterized.Parameter(0) -072 public int constructedBlockSize; -073 -074 @Parameterized.Parameter(1) -075 public int[] constructedBlockSizes; -076 -077 BucketCache cache; -078 final int CACHE_SIZE = 1000000; -079 final int NUM_BLOCKS = 100; -080 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; -081 final int NUM_THREADS = 100; -082 final int NUM_QUERIES = 10000; -083 -084 final long capacitySize = 32 * 1024 * 1024; -085 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; -086 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; -087 String ioEngineName = "heap"; -088 String persistencePath = null; -089 -090 private class MockedBucketCache extends BucketCache { +022import static org.junit.Assert.assertFalse; +023import static org.junit.Assert.assertThat; +024import static org.junit.Assert.assertTrue; +025 +026import java.io.FileNotFoundException; +027import java.io.IOException; +028import java.util.ArrayList; +029import java.util.Arrays; +030import java.util.Collection; +031import java.util.List; +032import java.util.Map; +033import java.util.Random; +034import java.util.Set; +035import java.util.concurrent.locks.ReentrantReadWriteLock; +036 +037import com.google.common.collect.ImmutableMap; +038import org.apache.hadoop.conf.Configuration; +039import org.apache.hadoop.fs.Path; +040import org.apache.hadoop.hbase.HBaseConfiguration; +041import org.apache.hadoop.hbase.HBaseTestingUtility; +042import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +043import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +044import org.apache.hadoop.hbase.io.hfile.Cacheable; +045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; +046import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; +047import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; +048import org.apache.hadoop.hbase.testclassification.IOTests; +049import org.apache.hadoop.hbase.testclassification.SmallTests; +050import org.junit.After; +051import org.junit.Before; +052import org.junit.Test; +053import org.junit.experimental.categories.Category; +054import org.junit.runner.RunWith; +055import org.junit.runners.Parameterized; +056 +057/** +058 * Basic test of BucketCache.Puts and gets. +059 * <p> +060 * Tests will ensure that blocks' data correctness under several threads concurrency +061 */ +062@RunWith(Parameterized.class) +063@Category({ IOTests.class, SmallTests.class }) +064public class TestBucketCache { +065 +066 private static final Random RAND = new Random(); +067 +068 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") +069 public static Iterable<Object[]> data() { +070 return Arrays.asList(new Object[][] { +071 { 8192, null }, // TODO: why is 8k the default blocksize for these tests? +072 { +073 16 * 1024, +074 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, +075 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, +076 128 * 1024 + 1024 } } }); +077 } +078 +079 @Parameterized.Parameter(0) +080 public int constructedBlockSize; +081 +082 @Parameterized.Parameter(1) +083 public int[] constructedBlockSizes; +084 +085 BucketCache cache; +086 final int CACHE_SIZE = 1000000; +087 final int NUM_BLOCKS = 100; +088 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; +089 final int NUM_THREADS = 100; +090 final int NUM_QUERIES = 10000; 091 -092 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, -093 int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException, -094 IOException { -095 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, -096 persistencePath); -097 super.wait_when_cache = true; -098 } +092 final long capacitySize = 32 * 1024 * 1024; +093 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; +094 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; +095 String ioEngineName = "heap"; +096 String persistencePath = null; +097 +098 private class MockedBucketCache extends BucketCache { 099 -100 @Override -101 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, -102 boolean cacheDataInL1) { -103 if (super.getBlock(cacheKey, true, false, true) != null) { -104 throw new RuntimeException("Cached an already cached block"); -105 } -106 super.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); -107 } -108 -109 @Override -110 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { +100 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, +101 int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException, +102 IOException { +103 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, +104 persistencePath); +105 super.wait_when_cache = true; +106 } +107 +108 @Override +109 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, +110 boolean cacheDataInL1) { 111 if (super.getBlock(cacheKey, true, false, true) != null) { 112 throw new RuntimeException("Cached an already cached block"); 113 } -114 super.cacheBlock(cacheKey, buf); +114 super.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); 115 } -116 } -117 -118 @Before -119 public void setup() throws FileNotFoundException, IOException { -120 cache = -121 new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, -122 constructedBlockSizes, writeThreads, writerQLen, persistencePath); -123 } -124 -125 @After -126 public void tearDown() { -127 cache.shutdown(); -128 } -129 -130 /** -131 * Return a random element from {@code a}. -132 */ -133 private static <T> T randFrom(List<T> a) { -134 return a.get(RAND.nextInt(a.size())); -135 } -136 -137 @Test -138 public void testBucketAllocator() throws BucketAllocatorException { -139 BucketAllocator mAllocator = cache.getAllocator(); -140 /* -141 * Test the allocator first -142 */ -143 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); +116 +117 @Override +118 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { +119 if (super.getBlock(cacheKey, true, false, true) != null) { +120 throw new RuntimeException("Cached an already cached block"); +121 } +122 super.cacheBlock(cacheKey, buf); +123 } +124 } +125 +126 @Before +127 public void setup() throws FileNotFoundException, IOException { +128 cache = +129 new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, +130 constructedBlockSizes, writeThreads, writerQLen, persistencePath); +131 } +132 +133 @After +134 public void tearDown() { +135 cache.shutdown(); +136 } +137 +138 /** +139 * Return a random element from {@code a}. +140 */ +141 private static <T> T randFrom(List<T> a) { +142 return a.get(RAND.nextInt(a.size())); +143 } 144 -145 boolean full = false; -146 ArrayList<Long> allocations = new ArrayList<>(); -147 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until -148 // the cache is completely filled. -149 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); -150 while (!full) { -151 Integer blockSize = null; -152 try { -153 blockSize = randFrom(tmp); -154 allocations.add(mAllocator.allocateBlock(blockSize)); -155 } catch (CacheFullException cfe) { -156 tmp.remove(blockSize); -157 if (tmp.isEmpty()) full = true; -158 } -159 } -160 -161 for (Integer blockSize : BLOCKSIZES) { -162 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); -163 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); -164 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); -165 } -166 -167 for (long offset : allocations) { -168 assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset)); -169 } -170 assertEquals(0, mAllocator.getUsedSize()); -171 } -172 -173 @Test -174 public void testCacheSimple() throws Exception { -175 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); -176 } -177 -178 @Test -179 public void testCacheMultiThreadedSingleKey() throws Exception { -180 CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES); -181 } -182 -183 @Test -184 public void testHeapSizeChanges() throws Exception { -185 cache.stopWriterThreads(); -186 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); -187 } -188 -189 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer -190 // threads will flush it to the bucket and put reference entry in backingMap. -191 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, -192 Cacheable block) throws InterruptedException { -193 cache.cacheBlock(cacheKey, block); -194 while (!cache.backingMap.containsKey(cacheKey)) { -195 Thread.sleep(100); -196 } -197 } -198 -199 @Test -200 public void testMemoryLeak() throws Exception { -201 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); -202 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( -203 new byte[10])); -204 long lockId = cache.backingMap.get(cacheKey).offset(); -205 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); -206 lock.writeLock().lock(); -207 Thread evictThread = new Thread("evict-block") { -208 -209 @Override -210 public void run() { -211 cache.evictBlock(cacheKey); -212 } -213 -214 }; -215 evictThread.start(); -216 cache.offsetLock.waitForWaiters(lockId, 1); -217 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true); -218 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( -219 new byte[10])); -220 lock.writeLock().unlock(); -221 evictThread.join(); -222 assertEquals(1L, cache.getBlockCount()); -223 assertTrue(cache.getCurrentSize() > 0L); -224 assertTrue("We should have a block!", cache.iterator().hasNext()); -225 } -226 -227 @Test -228 public void testRetrieveFromFile() throws Exception { -229 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); -230 Path testDir = TEST_UTIL.getDataTestDir(); -231 TEST_UTIL.getTestFileSystem().mkdirs(testDir); -232 -233 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, -234 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir -235 + "/bucket.persistence"); -236 long usedSize = bucketCache.getAllocator().getUsedSize(); -237 assertTrue(usedSize == 0); -238 -239 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); -240 // Add blocks -241 for (HFileBlockPair block : blocks) { -242 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); -243 } -244 for (HFileBlockPair block : blocks) { -245 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); -246 } -247 usedSize = bucketCache.getAllocator().getUsedSize(); -248 assertTrue(usedSize != 0); -249 // persist cache to file -250 bucketCache.shutdown(); -251 -252 // restore cache from file -253 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, -254 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir -255 + "/bucket.persistence"); -256 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); +145 @Test +146 public void testBucketAllocator() throws BucketAllocatorException { +147 BucketAllocator mAllocator = cache.getAllocator(); +148 /* +149 * Test the allocator first +150 */ +151 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); +152 +153 boolean full = false; +154 ArrayList<Long> allocations = new ArrayList<>(); +155 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until +156 // the cache is completely filled. +157 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); +158 while (!full) { +159 Integer blockSize = null; +160 try { +161 blockSize = randFrom(tmp); +162 allocations.add(mAllocator.allocateBlock(blockSize)); +163 } catch (CacheFullException cfe) { +164 tmp.remove(blockSize); +165 if (tmp.isEmpty()) full = true; +166 } +167 } +168 +169 for (Integer blockSize : BLOCKSIZES) { +170 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); +171 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); +172 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); +173 } +174 +175 for (long offset : allocations) { +176 assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset)); +177 } +178 assertEquals(0, mAllocator.getUsedSize()); +179 } +180 +181 @Test +182 public void testCacheSimple() throws Exception { +183 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); +184 } +185 +186 @Test +187 public void testCacheMultiThreadedSingleKey() throws Exception { +188 CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES); +189 } +190 +191 @Test +192 public void testHeapSizeChanges() throws Exception { +193 cache.stopWriterThreads(); +194 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); +195 } +196 +197 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer +198 // threads will flush it to the bucket and put reference entry in backingMap. +199 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, +200 Cacheable block) throws InterruptedException { +201 cache.cacheBlock(cacheKey, block); +202 while (!cache.backingMap.containsKey(cacheKey)) { +203 Thread.sleep(100); +204 } +205 } +206 +207 @Test +208 public void testMemoryLeak() throws Exception { +209 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); +210 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( +211 new byte[10])); +212 long lockId = cache.backingMap.get(cacheKey).offset(); +213 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); +214 lock.writeLock().lock(); +215 Thread evictThread = new Thread("evict-block") { +216 +217 @Override +218 public void run() { +219 cache.evictBlock(cacheKey); +220 } +221 +222 }; +223 evictThread.start(); +224 cache.offsetLock.waitForWaiters(lockId, 1); +225 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true); +226 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( +227 new byte[10])); +228 lock.writeLock().unlock(); +229 evictThread.join(); +230 assertEquals(1L, cache.getBlockCount()); +231 assertTrue(cache.getCurrentSize() > 0L); +232 assertTrue("We should have a block!", cache.iterator().hasNext()); +233 } +234 +235 @Test +236 public void testRetrieveFromFile() throws Exception { +237 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); +238 Path testDir = TEST_UTIL.getDataTestDir(); +239 TEST_UTIL.getTestFileSystem().mkdirs(testDir); +240 +241 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, +242 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir +243 + "/bucket.persistence"); +244 long usedSize = bucketCache.getAllocator().getUsedSize(); +245 assertTrue(usedSize == 0); +246 +247 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); +248 // Add blocks +249 for (HFileBlockPair block : blocks) { +250 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); +251 } +252 for (HFileBlockPair block : blocks) { +253 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); +254 } +255 usedSize = bucketCache.getAllocator().getUsedSize(); +256 assertTrue(usedSize != 0); 257 // persist cache to file 258 bucketCache.shutdown(); 259 -260 // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) -261 // so it can't restore cache from file -262 int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; -263 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, -264 constructedBlockSize, smallBucketSizes, writeThreads, -265 writerQLen, testDir + "/bucket.persistence"); -266 assertEquals(0, bucketCache.getAllocator().getUsedSize()); -267 assertEquals(0, bucketCache.backingMap.size()); -268 -269 TEST_UTIL.cleanupTestDir(); -270 } -271 -272 @Test -273 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { -274 long availableSpace = 20 * 1024L * 1024 * 1024; -275 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; -276 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); -277 assertTrue(allocator.getBuckets().length > 0); +260 // restore cache from file +261 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, +262 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir +263 + "/bucket.persistence"); +264 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); +265 // persist cache to file +266 bucketCache.shutdown(); +267 +268 // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) +269 // so it can't restore cache from file +270 int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; +271 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, +272 constructedBlockSize, smallBucketSizes, writeThreads, +273 writerQLen, testDir + "/bucket.persistence"); +274 assertEquals(0, bucketCache.getAllocator().getUsedSize()); +275 assertEquals(0, bucketCache.backingMap.size()); +276 +277 TEST_UTIL.cleanupTestDir(); 278 } -279} +279 +280 @Test +281 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { +282 long availableSpace = 20 * 1024L * 1024 * 1024; +283 int[] bucketSizes = new int[]{1024, 1024 * 1024, 1024 * 1024 * 1024}; +284 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); +285 assertTrue(allocator.getBuckets().length > 0); +286 } +287 +288 @Test +289 public void testGetPartitionSize() throws IOException { +290 //Test default values +291 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, BucketCache.DEFAULT_MIN_FACTOR); +292 +293 Configuration conf = HBaseConfiguration.create(); +294 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); +295 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); +296 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); +297 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); +298 +299 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, +300 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); +301 +302 validateGetPartitionSize(cache, 0.1f, 0.5f); +303 validateGetPartitionSize(cache, 0.7f, 0.5f); +304 validateGetPartitionSize(cache, 0.2f, 0.5f); +305 } +306 +307 @Test +308 public void testValidBucketCacheConfigs() throws IOException { +309 Configuration conf = HBaseConfiguration.create(); +310 conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); +311 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); +312 conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); +313 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); +314 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); +315 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); +316 +317 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, +318 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); +319 +320 assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getAcceptableFactor(), 0.9f, 0); +321 assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getMinFactor(), 0.5f, 0); +322 assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getExtraFreeFactor(), 0.5f, 0); +323 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getSingleFactor(), 0.1f, 0); +324 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getMultiFactor(), 0.7f, 0); +325 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getMemoryFactor(), 0.2f, 0); +326 } +327 +328 @Test +329 public void testInvalidAcceptFactorConfig() throws IOException { +330 float[] configValues = {-1f, 0.2f, 0.86f, 1.05f}; +331 boolean[] expectedOutcomes = {false, false, true, false}; +332 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); +333 Configuration conf = HBaseConfiguration.create(); +334 checkConfigValues(conf, configMappings, expectedOutcomes); +335 } +336 +337 @Test +338 public void testInvalidMinFactorConfig() throws IOException { +339 float[] configValues = {-1f, 0f, 0.96f, 1.05f}; +340 //throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 +341 boolean[] expectedOutcomes = {false, true, false, false}; +342 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); +343 Configuration conf = HBaseConfiguration.create(); +344 checkConfigValues(conf, configMappings, expectedOutcomes); +345 } +346 +347 @Test +348 public void testInvalidExtraFreeFactorConfig() throws IOException { +349 float[] configValues = {-1f, 0f, 0.2f, 1.05f}; +350 //throws due to <0, in expected range, in expected range, config can be > 1.0 +351 boolean[] expectedOutcomes = {false, true, true, true}; +352 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); +353 Configuration conf = HBaseConfiguration.create(); +354 checkConfigValues(conf, configMappings, expectedOutcomes); +355 } +356 +357 @Test +358 public void testInvalidCacheSplitFactorConfig() throws IOException { +359 float[] singleFactorConfigValues = {0.2f, 0f, -0.2f, 1f}; +360 float[] multiFactorConfigValues = {0.4f, 0f, 1f, .05f}; +361 float[] memoryFactorConfigValues = {0.4f, 0f, 0.2f, .5f}; +362 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't be negative, configs don't add to 1.0 +363 boolean[] expectedOutcomes = {true, false, false, false}; +364 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, +365 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, +366 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); +367 Configuration conf = HBaseConfiguration.create(); +368 checkConfigValues(conf, configMappings, expectedOutcomes); +369 } +370 +371 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, boolean[] expectSuccess) throws IOException { +372 Set<String> configNames = configMap.keySet(); +373 for (int i = 0; i < expectSuccess.length; i++) { +374 try { +375 for (String configName : configNames) { +376 conf.setFloat(configName, configMap.get(configName)[i]); +377 } +378 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, +379 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); +380 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); +381 } catch (IllegalArgumentException e) { +382 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); +383 } +384 } +385 } +386 +387 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, float minFactor) { +388 long expectedOutput = (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); +389 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); +390 } +391}