Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6DF7618E31 for ; Tue, 23 Feb 2016 17:09:10 +0000 (UTC) Received: (qmail 24866 invoked by uid 500); 23 Feb 2016 17:08:11 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 24732 invoked by uid 500); 23 Feb 2016 17:08:11 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23579 invoked by uid 99); 23 Feb 2016 17:08:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 17:08:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F3D5E8E84; Tue, 23 Feb 2016 17:08:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 23 Feb 2016 17:08:25 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/51] [partial] hbase-site git commit: Published site at 58283fa1b1b10beec62cefa40babff6a1424b06c. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d02dd5db/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntryGroup.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntryGroup.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntryGroup.html index 3a9a7f7..d857fe6 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntryGroup.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntryGroup.html @@ -311,1174 +311,1177 @@ 303 */ 304 private IOEngine getIOEngineFromName(String ioEngineName, long capacity) 305 throws IOException { -306 if (ioEngineName.startsWith("file:")) +306 if (ioEngineName.startsWith("file:")) { 307 return new FileIOEngine(ioEngineName.substring(5), capacity); -308 else if (ioEngineName.startsWith("offheap")) +308 } else if (ioEngineName.startsWith("offheap")) { 309 return new ByteBufferIOEngine(capacity, true); -310 else if (ioEngineName.startsWith("heap")) +310 } else if (ioEngineName.startsWith("heap")) { 311 return new ByteBufferIOEngine(capacity, false); -312 else -313 throw new IllegalArgumentException( -314 "Don't understand io engine name for cache - prefix with file:, heap or offheap"); -315 } -316 -317 /** -318 * Cache the block with the specified name and buffer. -319 * @param cacheKey block's cache key -320 * @param buf block buffer -321 */ -322 @Override -323 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { -324 cacheBlock(cacheKey, buf, false, false); -325 } -326 -327 /** -328 * Cache the block with the specified name and buffer. -329 * @param cacheKey block's cache key -330 * @param cachedItem block buffer -331 * @param inMemory if block is in-memory -332 * @param cacheDataInL1 -333 */ -334 @Override -335 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, -336 final boolean cacheDataInL1) { -337 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); -338 } -339 -340 /** -341 * Cache the block to ramCache -342 * @param cacheKey block's cache key -343 * @param cachedItem block buffer -344 * @param inMemory if block is in-memory -345 * @param wait if true, blocking wait when queue is full -346 */ -347 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, -348 boolean wait) { -349 if (!cacheEnabled) { -350 return; -351 } -352 -353 if (backingMap.containsKey(cacheKey)) { -354 return; -355 } -356 -357 /* -358 * Stuff the entry into the RAM cache so it can get drained to the persistent store -359 */ -360 RAMQueueEntry re = -361 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); -362 if (ramCache.putIfAbsent(cacheKey, re) != null) { -363 return; -364 } -365 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); -366 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); -367 boolean successfulAddition = false; -368 if (wait) { -369 try { -370 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); -371 } catch (InterruptedException e) { -372 Thread.currentThread().interrupt(); -373 } -374 } else { -375 successfulAddition = bq.offer(re); -376 } -377 if (!successfulAddition) { -378 ramCache.remove(cacheKey); -379 cacheStats.failInsert(); -380 } else { -381 this.blockNumber.incrementAndGet(); -382 this.heapSize.addAndGet(cachedItem.heapSize()); -383 blocksByHFile.put(cacheKey.getHfileName(), cacheKey); -384 } -385 } -386 -387 /** -388 * Get the buffer of the block with the specified key. -389 * @param key block's cache key -390 * @param caching true if the caller caches blocks on cache misses -391 * @param repeat Whether this is a repeat lookup for the same block -392 * @param updateCacheMetrics Whether we should update cache metrics or not -393 * @return buffer of specified cache key, or null if not in cache -394 */ -395 @Override -396 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, -397 boolean updateCacheMetrics) { -398 if (!cacheEnabled) { -399 return null; -400 } -401 RAMQueueEntry re = ramCache.get(key); -402 if (re != null) { -403 if (updateCacheMetrics) { -404 cacheStats.hit(caching, key.isPrimary()); -405 } -406 re.access(accessCount.incrementAndGet()); -407 return re.getData(); -408 } -409 BucketEntry bucketEntry = backingMap.get(key); -410 if (bucketEntry != null) { -411 long start = System.nanoTime(); -412 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); -413 try { -414 lock.readLock().lock(); -415 // We can not read here even if backingMap does contain the given key because its offset -416 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check -417 // existence here. -418 if (bucketEntry.equals(backingMap.get(key))) { -419 // TODO : change this area - should be removed after server cells and -420 // 12295 are available -421 int len = bucketEntry.getLength(); -422 Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, -423 bucketEntry.deserializerReference(this.deserialiserMap)); -424 long timeTaken = System.nanoTime() - start; -425 if (updateCacheMetrics) { -426 cacheStats.hit(caching, key.isPrimary()); -427 cacheStats.ioHit(timeTaken); -428 } -429 if (cachedBlock.getMemoryType() == MemoryType.SHARED) { -430 bucketEntry.refCount.incrementAndGet(); +312 } else if (ioEngineName.startsWith("mmap:")) { +313 return new FileMmapEngine(ioEngineName.substring(5), capacity); +314 } else { +315 throw new IllegalArgumentException( +316 "Don't understand io engine name for cache - prefix with file:, heap or offheap"); +317 } +318 } +319 +320 /** +321 * Cache the block with the specified name and buffer. +322 * @param cacheKey block's cache key +323 * @param buf block buffer +324 */ +325 @Override +326 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { +327 cacheBlock(cacheKey, buf, false, false); +328 } +329 +330 /** +331 * Cache the block with the specified name and buffer. +332 * @param cacheKey block's cache key +333 * @param cachedItem block buffer +334 * @param inMemory if block is in-memory +335 * @param cacheDataInL1 +336 */ +337 @Override +338 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, +339 final boolean cacheDataInL1) { +340 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); +341 } +342 +343 /** +344 * Cache the block to ramCache +345 * @param cacheKey block's cache key +346 * @param cachedItem block buffer +347 * @param inMemory if block is in-memory +348 * @param wait if true, blocking wait when queue is full +349 */ +350 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, +351 boolean wait) { +352 if (!cacheEnabled) { +353 return; +354 } +355 +356 if (backingMap.containsKey(cacheKey)) { +357 return; +358 } +359 +360 /* +361 * Stuff the entry into the RAM cache so it can get drained to the persistent store +362 */ +363 RAMQueueEntry re = +364 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); +365 if (ramCache.putIfAbsent(cacheKey, re) != null) { +366 return; +367 } +368 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); +369 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); +370 boolean successfulAddition = false; +371 if (wait) { +372 try { +373 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); +374 } catch (InterruptedException e) { +375 Thread.currentThread().interrupt(); +376 } +377 } else { +378 successfulAddition = bq.offer(re); +379 } +380 if (!successfulAddition) { +381 ramCache.remove(cacheKey); +382 cacheStats.failInsert(); +383 } else { +384 this.blockNumber.incrementAndGet(); +385 this.heapSize.addAndGet(cachedItem.heapSize()); +386 blocksByHFile.put(cacheKey.getHfileName(), cacheKey); +387 } +388 } +389 +390 /** +391 * Get the buffer of the block with the specified key. +392 * @param key block's cache key +393 * @param caching true if the caller caches blocks on cache misses +394 * @param repeat Whether this is a repeat lookup for the same block +395 * @param updateCacheMetrics Whether we should update cache metrics or not +396 * @return buffer of specified cache key, or null if not in cache +397 */ +398 @Override +399 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, +400 boolean updateCacheMetrics) { +401 if (!cacheEnabled) { +402 return null; +403 } +404 RAMQueueEntry re = ramCache.get(key); +405 if (re != null) { +406 if (updateCacheMetrics) { +407 cacheStats.hit(caching, key.isPrimary()); +408 } +409 re.access(accessCount.incrementAndGet()); +410 return re.getData(); +411 } +412 BucketEntry bucketEntry = backingMap.get(key); +413 if (bucketEntry != null) { +414 long start = System.nanoTime(); +415 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); +416 try { +417 lock.readLock().lock(); +418 // We can not read here even if backingMap does contain the given key because its offset +419 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check +420 // existence here. +421 if (bucketEntry.equals(backingMap.get(key))) { +422 // TODO : change this area - should be removed after server cells and +423 // 12295 are available +424 int len = bucketEntry.getLength(); +425 Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, +426 bucketEntry.deserializerReference(this.deserialiserMap)); +427 long timeTaken = System.nanoTime() - start; +428 if (updateCacheMetrics) { +429 cacheStats.hit(caching, key.isPrimary()); +430 cacheStats.ioHit(timeTaken); 431 } -432 bucketEntry.access(accessCount.incrementAndGet()); -433 if (this.ioErrorStartTime > 0) { -434 ioErrorStartTime = -1; -435 } -436 return cachedBlock; -437 } -438 } catch (IOException ioex) { -439 LOG.error("Failed reading block " + key + " from bucket cache", ioex); -440 checkIOErrorIsTolerated(); -441 } finally { -442 lock.readLock().unlock(); -443 } -444 } -445 if (!repeat && updateCacheMetrics) { -446 cacheStats.miss(caching, key.isPrimary()); +432 if (cachedBlock.getMemoryType() == MemoryType.SHARED) { +433 bucketEntry.refCount.incrementAndGet(); +434 } +435 bucketEntry.access(accessCount.incrementAndGet()); +436 if (this.ioErrorStartTime > 0) { +437 ioErrorStartTime = -1; +438 } +439 return cachedBlock; +440 } +441 } catch (IOException ioex) { +442 LOG.error("Failed reading block " + key + " from bucket cache", ioex); +443 checkIOErrorIsTolerated(); +444 } finally { +445 lock.readLock().unlock(); +446 } 447 } -448 return null; -449 } -450 -451 @VisibleForTesting -452 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { -453 bucketAllocator.freeBlock(bucketEntry.offset()); -454 realCacheSize.addAndGet(-1 * bucketEntry.getLength()); -455 blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); -456 if (decrementBlockNumber) { -457 this.blockNumber.decrementAndGet(); -458 } -459 } -460 -461 @Override -462 public boolean evictBlock(BlockCacheKey cacheKey) { -463 return evictBlock(cacheKey, true); -464 } -465 -466 // does not check for the ref count. Just tries to evict it if found in the -467 // bucket map -468 private boolean forceEvict(BlockCacheKey cacheKey) { -469 if (!cacheEnabled) { -470 return false; -471 } -472 RAMQueueEntry removedBlock = checkRamCache(cacheKey); -473 BucketEntry bucketEntry = backingMap.get(cacheKey); -474 if (bucketEntry == null) { -475 if (removedBlock != null) { -476 cacheStats.evicted(0, cacheKey.isPrimary()); -477 return true; -478 } else { -479 return false; -480 } -481 } -482 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); -483 try { -484 lock.writeLock().lock(); -485 if (backingMap.remove(cacheKey, bucketEntry)) { -486 blockEvicted(cacheKey, bucketEntry, removedBlock == null); -487 } else { -488 return false; -489 } -490 } finally { -491 lock.writeLock().unlock(); -492 } -493 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); -494 return true; -495 } -496 -497 private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { -498 RAMQueueEntry removedBlock = ramCache.remove(cacheKey); -499 if (removedBlock != null) { -500 this.blockNumber.decrementAndGet(); -501 this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); -502 } -503 return removedBlock; -504 } -505 -506 public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { -507 if (!cacheEnabled) { -508 return false; -509 } -510 RAMQueueEntry removedBlock = checkRamCache(cacheKey); -511 BucketEntry bucketEntry = backingMap.get(cacheKey); -512 if (bucketEntry == null) { -513 if (removedBlock != null) { -514 cacheStats.evicted(0, cacheKey.isPrimary()); -515 return true; -516 } else { -517 return false; -518 } -519 } -520 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); -521 try { -522 lock.writeLock().lock(); -523 int refCount = bucketEntry.refCount.get(); -524 if(refCount == 0) { -525 if (backingMap.remove(cacheKey, bucketEntry)) { -526 blockEvicted(cacheKey, bucketEntry, removedBlock == null); -527 } else { -528 return false; -529 } -530 } else { -531 if(!deletedBlock) { -532 if (LOG.isDebugEnabled()) { -533 LOG.debug("This block " + cacheKey + " is still referred by " + refCount -534 + " readers. Can not be freed now"); -535 } -536 return false; -537 } else { -538 if (LOG.isDebugEnabled()) { -539 LOG.debug("This block " + cacheKey + " is still referred by " + refCount -540 + " readers. Can not be freed now. Hence will mark this" -541 + " for evicting at a later point"); -542 } -543 bucketEntry.markedForEvict = true; -544 } -545 } -546 } finally { -547 lock.writeLock().unlock(); -548 } -549 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); -550 return true; -551 } -552 -553 /* -554 * Statistics thread. Periodically output cache statistics to the log. -555 */ -556 private static class StatisticsThread extends Thread { -557 private final BucketCache bucketCache; -558 -559 public StatisticsThread(BucketCache bucketCache) { -560 super("BucketCacheStatsThread"); -561 setDaemon(true); -562 this.bucketCache = bucketCache; -563 } -564 -565 @Override -566 public void run() { -567 bucketCache.logStats(); -568 } -569 } -570 -571 public void logStats() { -572 long totalSize = bucketAllocator.getTotalSize(); -573 long usedSize = bucketAllocator.getUsedSize(); -574 long freeSize = totalSize - usedSize; -575 long cacheSize = getRealCacheSize(); -576 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + -577 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + -578 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + -579 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + -580 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + -581 "accesses=" + cacheStats.getRequestCount() + ", " + -582 "hits=" + cacheStats.getHitCount() + ", " + -583 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + -584 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + -585 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : -586 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + -587 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + -588 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + -589 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : -590 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + -591 "evictions=" + cacheStats.getEvictionCount() + ", " + -592 "evicted=" + cacheStats.getEvictedCount() + ", " + -593 "evictedPerRun=" + cacheStats.evictedPerEviction()); -594 cacheStats.reset(); -595 } -596 -597 public long getRealCacheSize() { -598 return this.realCacheSize.get(); -599 } -600 -601 private long acceptableSize() { -602 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR); -603 } -604 -605 private long singleSize() { -606 return (long) Math.floor(bucketAllocator.getTotalSize() -607 * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR); -608 } -609 -610 private long multiSize() { -611 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR -612 * DEFAULT_MIN_FACTOR); -613 } -614 -615 private long memorySize() { -616 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR -617 * DEFAULT_MIN_FACTOR); -618 } -619 -620 /** -621 * Free the space if the used size reaches acceptableSize() or one size block -622 * couldn't be allocated. When freeing the space, we use the LRU algorithm and -623 * ensure there must be some blocks evicted -624 * @param why Why we are being called -625 */ -626 private void freeSpace(final String why) { -627 // Ensure only one freeSpace progress at a time -628 if (!freeSpaceLock.tryLock()) return; -629 try { -630 freeInProgress = true; -631 long bytesToFreeWithoutExtra = 0; -632 // Calculate free byte for each bucketSizeinfo -633 StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null; -634 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); -635 long[] bytesToFreeForBucket = new long[stats.length]; -636 for (int i = 0; i < stats.length; i++) { -637 bytesToFreeForBucket[i] = 0; -638 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); -639 freeGoal = Math.max(freeGoal, 1); -640 if (stats[i].freeCount() < freeGoal) { -641 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); -642 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; -643 if (msgBuffer != null) { -644 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" -645 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); -646 } -647 } -648 } -649 if (msgBuffer != null) { -650 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); +448 if (!repeat && updateCacheMetrics) { +449 cacheStats.miss(caching, key.isPrimary()); +450 } +451 return null; +452 } +453 +454 @VisibleForTesting +455 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { +456 bucketAllocator.freeBlock(bucketEntry.offset()); +457 realCacheSize.addAndGet(-1 * bucketEntry.getLength()); +458 blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); +459 if (decrementBlockNumber) { +460 this.blockNumber.decrementAndGet(); +461 } +462 } +463 +464 @Override +465 public boolean evictBlock(BlockCacheKey cacheKey) { +466 return evictBlock(cacheKey, true); +467 } +468 +469 // does not check for the ref count. Just tries to evict it if found in the +470 // bucket map +471 private boolean forceEvict(BlockCacheKey cacheKey) { +472 if (!cacheEnabled) { +473 return false; +474 } +475 RAMQueueEntry removedBlock = checkRamCache(cacheKey); +476 BucketEntry bucketEntry = backingMap.get(cacheKey); +477 if (bucketEntry == null) { +478 if (removedBlock != null) { +479 cacheStats.evicted(0, cacheKey.isPrimary()); +480 return true; +481 } else { +482 return false; +483 } +484 } +485 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); +486 try { +487 lock.writeLock().lock(); +488 if (backingMap.remove(cacheKey, bucketEntry)) { +489 blockEvicted(cacheKey, bucketEntry, removedBlock == null); +490 } else { +491 return false; +492 } +493 } finally { +494 lock.writeLock().unlock(); +495 } +496 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); +497 return true; +498 } +499 +500 private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { +501 RAMQueueEntry removedBlock = ramCache.remove(cacheKey); +502 if (removedBlock != null) { +503 this.blockNumber.decrementAndGet(); +504 this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); +505 } +506 return removedBlock; +507 } +508 +509 public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { +510 if (!cacheEnabled) { +511 return false; +512 } +513 RAMQueueEntry removedBlock = checkRamCache(cacheKey); +514 BucketEntry bucketEntry = backingMap.get(cacheKey); +515 if (bucketEntry == null) { +516 if (removedBlock != null) { +517 cacheStats.evicted(0, cacheKey.isPrimary()); +518 return true; +519 } else { +520 return false; +521 } +522 } +523 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); +524 try { +525 lock.writeLock().lock(); +526 int refCount = bucketEntry.refCount.get(); +527 if(refCount == 0) { +528 if (backingMap.remove(cacheKey, bucketEntry)) { +529 blockEvicted(cacheKey, bucketEntry, removedBlock == null); +530 } else { +531 return false; +532 } +533 } else { +534 if(!deletedBlock) { +535 if (LOG.isDebugEnabled()) { +536 LOG.debug("This block " + cacheKey + " is still referred by " + refCount +537 + " readers. Can not be freed now"); +538 } +539 return false; +540 } else { +541 if (LOG.isDebugEnabled()) { +542 LOG.debug("This block " + cacheKey + " is still referred by " + refCount +543 + " readers. Can not be freed now. Hence will mark this" +544 + " for evicting at a later point"); +545 } +546 bucketEntry.markedForEvict = true; +547 } +548 } +549 } finally { +550 lock.writeLock().unlock(); +551 } +552 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); +553 return true; +554 } +555 +556 /* +557 * Statistics thread. Periodically output cache statistics to the log. +558 */ +559 private static class StatisticsThread extends Thread { +560 private final BucketCache bucketCache; +561 +562 public StatisticsThread(BucketCache bucketCache) { +563 super("BucketCacheStatsThread"); +564 setDaemon(true); +565 this.bucketCache = bucketCache; +566 } +567 +568 @Override +569 public void run() { +570 bucketCache.logStats(); +571 } +572 } +573 +574 public void logStats() { +575 long totalSize = bucketAllocator.getTotalSize(); +576 long usedSize = bucketAllocator.getUsedSize(); +577 long freeSize = totalSize - usedSize; +578 long cacheSize = getRealCacheSize(); +579 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + +580 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + +581 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + +582 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + +583 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + +584 "accesses=" + cacheStats.getRequestCount() + ", " + +585 "hits=" + cacheStats.getHitCount() + ", " + +586 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + +587 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + +588 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : +589 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + +590 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + +591 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + +592 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : +593 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + +594 "evictions=" + cacheStats.getEvictionCount() + ", " + +595 "evicted=" + cacheStats.getEvictedCount() + ", " + +596 "evictedPerRun=" + cacheStats.evictedPerEviction()); +597 cacheStats.reset(); +598 } +599 +600 public long getRealCacheSize() { +601 return this.realCacheSize.get(); +602 } +603 +604 private long acceptableSize() { +605 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR); +606 } +607 +608 private long singleSize() { +609 return (long) Math.floor(bucketAllocator.getTotalSize() +610 * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR); +611 } +612 +613 private long multiSize() { +614 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR +615 * DEFAULT_MIN_FACTOR); +616 } +617 +618 private long memorySize() { +619 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR +620 * DEFAULT_MIN_FACTOR); +621 } +622 +623 /** +624 * Free the space if the used size reaches acceptableSize() or one size block +625 * couldn't be allocated. When freeing the space, we use the LRU algorithm and +626 * ensure there must be some blocks evicted +627 * @param why Why we are being called +628 */ +629 private void freeSpace(final String why) { +630 // Ensure only one freeSpace progress at a time +631 if (!freeSpaceLock.tryLock()) return; +632 try { +633 freeInProgress = true; +634 long bytesToFreeWithoutExtra = 0; +635 // Calculate free byte for each bucketSizeinfo +636 StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null; +637 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); +638 long[] bytesToFreeForBucket = new long[stats.length]; +639 for (int i = 0; i < stats.length; i++) { +640 bytesToFreeForBucket[i] = 0; +641 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); +642 freeGoal = Math.max(freeGoal, 1); +643 if (stats[i].freeCount() < freeGoal) { +644 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); +645 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; +646 if (msgBuffer != null) { +647 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" +648 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); +649 } +650 } 651 } -652 -653 if (bytesToFreeWithoutExtra <= 0) { -654 return; -655 } -656 long currentSize = bucketAllocator.getUsedSize(); -657 long totalSize=bucketAllocator.getTotalSize(); -658 if (LOG.isDebugEnabled() && msgBuffer != null) { -659 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + -660 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + -661 StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); -662 } -663 -664 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra -665 * (1 + DEFAULT_EXTRA_FREE_FACTOR)); +652 if (msgBuffer != null) { +653 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); +654 } +655 +656 if (bytesToFreeWithoutExtra <= 0) { +657 return; +658 } +659 long currentSize = bucketAllocator.getUsedSize(); +660 long totalSize=bucketAllocator.getTotalSize(); +661 if (LOG.isDebugEnabled() && msgBuffer != null) { +662 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + +663 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + +664 StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); +665 } 666 -667 // Instantiate priority buckets -668 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, -669 blockSize, singleSize()); -670 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, -671 blockSize, multiSize()); -672 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, -673 blockSize, memorySize()); -674 -675 // Scan entire map putting bucket entry into appropriate bucket entry -676 // group -677 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { -678 switch (bucketEntryWithKey.getValue().getPriority()) { -679 case SINGLE: { -680 bucketSingle.add(bucketEntryWithKey); -681 break; -682 } -683 case MULTI: { -684 bucketMulti.add(bucketEntryWithKey); -685 break; -686 } -687 case MEMORY: { -688 bucketMemory.add(bucketEntryWithKey); -689 break; -690 } -691 } -692 } -693 -694 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3); -695 -696 bucketQueue.add(bucketSingle); -697 bucketQueue.add(bucketMulti); -698 bucketQueue.add(bucketMemory); -699 -700 int remainingBuckets = 3; -701 long bytesFreed = 0; +667 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra +668 * (1 + DEFAULT_EXTRA_FREE_FACTOR)); +669 +670 // Instantiate priority buckets +671 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, +672 blockSize, singleSize()); +673 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, +674 blockSize, multiSize()); +675 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, +676 blockSize, memorySize()); +677 +678 // Scan entire map putting bucket entry into appropriate bucket entry +679 // group +680 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { +681 switch (bucketEntryWithKey.getValue().getPriority()) { +682 case SINGLE: { +683 bucketSingle.add(bucketEntryWithKey); +684 break; +685 } +686 case MULTI: { +687 bucketMulti.add(bucketEntryWithKey); +688 break; +689 } +690 case MEMORY: { +691 bucketMemory.add(bucketEntryWithKey); +692 break; +693 } +694 } +695 } +696 +697 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3); +698 +699 bucketQueue.add(bucketSingle); +700 bucketQueue.add(bucketMulti); +701 bucketQueue.add(bucketMemory); 702 -703 BucketEntryGroup bucketGroup; -704 while ((bucketGroup = bucketQueue.poll()) != null) { -705 long overflow = bucketGroup.overflow(); -706 if (overflow > 0) { -707 long bucketBytesToFree = Math.min(overflow, -708 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); -709 bytesFreed += bucketGroup.free(bucketBytesToFree); -710 } -711 remainingBuckets--; -712 } -713 -714 /** -715 * Check whether need extra free because some bucketSizeinfo still needs -716 * free space -717 */ -718 stats = bucketAllocator.getIndexStatistics(); -719 boolean needFreeForExtra = false; -720 for (int i = 0; i < stats.length; i++) { -721 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); -722 freeGoal = Math.max(freeGoal, 1); -723 if (stats[i].freeCount() < freeGoal) { -724 needFreeForExtra = true; -725 break; -726 } -727 } -728 -729 if (needFreeForExtra) { -730 bucketQueue.clear(); -731 remainingBuckets = 2; -732 -733 bucketQueue.add(bucketSingle); -734 bucketQueue.add(bucketMulti); +703 int remainingBuckets = 3; +704 long bytesFreed = 0; +705 +706 BucketEntryGroup bucketGroup; +707 while ((bucketGroup = bucketQueue.poll()) != null) { +708 long overflow = bucketGroup.overflow(); +709 if (overflow > 0) { +710 long bucketBytesToFree = Math.min(overflow, +711 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); +712 bytesFreed += bucketGroup.free(bucketBytesToFree); +713 } +714 remainingBuckets--; +715 } +716 +717 /** +718 * Check whether need extra free because some bucketSizeinfo still needs +719 * free space +720 */ +721 stats = bucketAllocator.getIndexStatistics(); +722 boolean needFreeForExtra = false; +723 for (int i = 0; i < stats.length; i++) { +724 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); +725 freeGoal = Math.max(freeGoal, 1); +726 if (stats[i].freeCount() < freeGoal) { +727 needFreeForExtra = true; +728 break; +729 } +730 } +731 +732 if (needFreeForExtra) { +733 bucketQueue.clear(); +734 remainingBuckets = 2; 735 -736 while ((bucketGroup = bucketQueue.poll()) != null) { -737 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; -738 bytesFreed += bucketGroup.free(bucketBytesToFree); -739 remainingBuckets--; -740 } -741 } -742 -743 if (LOG.isDebugEnabled()) { -744 long single = bucketSingle.totalSize(); -745 long multi = bucketMulti.totalSize(); -746 long memory = bucketMemory.totalSize(); -747 if (LOG.isDebugEnabled()) { -748 LOG.debug("Bucket cache free space completed; " + "freed=" -749 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" -750 + StringUtils.byteDesc(totalSize) + ", " + "single=" -751 + StringUtils.byteDesc(single) + ", " + "multi=" -752 + StringUtils.byteDesc(multi) + ", " + "memory=" -753 + StringUtils.byteDesc(memory)); -754 } -755 } -756 -757 } catch (Throwable t) { -758 LOG.warn("Failed freeing space", t); -759 } finally { -760 cacheStats.evict(); -761 freeInProgress = false; -762 freeSpaceLock.unlock(); -763 } -764 } -765 -766 // This handles flushing the RAM cache to IOEngine. -767 @VisibleForTesting -768 class WriterThread extends HasThread { -769 private final BlockingQueue<RAMQueueEntry> inputQueue; -770 private volatile boolean writerEnabled = true; -771 -772 WriterThread(BlockingQueue<RAMQueueEntry> queue) { -773 this.inputQueue = queue; -774 } -775 -776 // Used for test -777 @VisibleForTesting -778 void disableWriter() { -779 this.writerEnabled = false; -780 } -781 -782 public void run() { -783 List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>(); -784 try { -785 while (cacheEnabled && writerEnabled) { -786 try { -787 try { -788 // Blocks -789 entries = getRAMQueueEntries(inputQueue, entries); -790 } catch (InterruptedException ie) { -791 if (!cacheEnabled) break; -792 } -793 doDrain(entries); -794 } catch (Exception ioe) { -795 LOG.error("WriterThread encountered error", ioe); -796 } -797 }