Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3BBE52004F3 for ; Tue, 15 Aug 2017 17:06:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A15A166C02; Tue, 15 Aug 2017 15:06:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0A60166BB5 for ; Tue, 15 Aug 2017 17:06:28 +0200 (CEST) Received: (qmail 10290 invoked by uid 500); 15 Aug 2017 15:06:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 8066 invoked by uid 99); 15 Aug 2017 15:06:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Aug 2017 15:06:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BFB83F5EED; Tue, 15 Aug 2017 15:06:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 15 Aug 2017 15:06:54 -0000 Message-Id: <0baa94ec6f5e4d5bbcf175c9cdd71fea@git.apache.org> In-Reply-To: <1727cd37fde541ada4a5e500bb67f85f@git.apache.org> References: <1727cd37fde541ada4a5e500bb67f85f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/51] [partial] hbase-site git commit: Published site at . archived-at: Tue, 15 Aug 2017 15:06:30 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/07e68d46/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntry.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntry.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntry.html index a614cd4..01801ca 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntry.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.BucketEntry.html @@ -60,520 +60,520 @@ 052import java.util.concurrent.locks.ReentrantLock; 053import java.util.concurrent.locks.ReentrantReadWriteLock; 054 -055import org.apache.commons.logging.Log; -056import org.apache.commons.logging.LogFactory; -057import org.apache.hadoop.hbase.classification.InterfaceAudience; -058import org.apache.hadoop.hbase.io.HeapSize; -059import org.apache.hadoop.hbase.io.hfile.BlockCache; -060import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -061import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; -062import org.apache.hadoop.hbase.io.hfile.BlockPriority; -063import org.apache.hadoop.hbase.io.hfile.BlockType; -064import org.apache.hadoop.hbase.io.hfile.CacheStats; -065import org.apache.hadoop.hbase.io.hfile.Cacheable; -066import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; -067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; -068import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; -069import org.apache.hadoop.hbase.io.hfile.CachedBlock; -070import org.apache.hadoop.hbase.io.hfile.HFileBlock; -071import org.apache.hadoop.hbase.nio.ByteBuff; -072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -073import org.apache.hadoop.hbase.util.HasThread; -074import org.apache.hadoop.hbase.util.IdReadWriteLock; -075import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; -076import org.apache.hadoop.util.StringUtils; -077 -078import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -079import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +055import com.google.common.base.Preconditions; +056import org.apache.commons.logging.Log; +057import org.apache.commons.logging.LogFactory; +058import org.apache.hadoop.conf.Configuration; +059import org.apache.hadoop.hbase.HBaseConfiguration; +060import org.apache.hadoop.hbase.classification.InterfaceAudience; +061import org.apache.hadoop.hbase.io.HeapSize; +062import org.apache.hadoop.hbase.io.hfile.BlockCache; +063import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +064import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; +065import org.apache.hadoop.hbase.io.hfile.BlockPriority; +066import org.apache.hadoop.hbase.io.hfile.BlockType; +067import org.apache.hadoop.hbase.io.hfile.CacheStats; +068import org.apache.hadoop.hbase.io.hfile.Cacheable; +069import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; +070import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; +071import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; +072import org.apache.hadoop.hbase.io.hfile.CachedBlock; +073import org.apache.hadoop.hbase.io.hfile.HFileBlock; +074import org.apache.hadoop.hbase.nio.ByteBuff; +075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +076import org.apache.hadoop.hbase.util.HasThread; +077import org.apache.hadoop.hbase.util.IdReadWriteLock; +078import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; +079import org.apache.hadoop.util.StringUtils; 080 -081/** -082 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses -083 * BucketCache#ramCache and BucketCache#backingMap in order to -084 * determine if a given element is in the cache. The bucket cache can use on-heap or -085 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to -086 * store/read the block data. -087 * -088 * <p>Eviction is via a similar algorithm as used in -089 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} +081import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +082import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +083 +084/** +085 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses +086 * BucketCache#ramCache and BucketCache#backingMap in order to +087 * determine if a given element is in the cache. The bucket cache can use on-heap or +088 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to +089 * store/read the block data. 090 * -091 * <p>BucketCache can be used as mainly a block cache (see -092 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with -093 * LruBlockCache to decrease CMS GC and heap fragmentation. -094 * -095 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store -096 * blocks) to enlarge cache space via -097 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} -098 */ -099@InterfaceAudience.Private -100public class BucketCache implements BlockCache, HeapSize { -101 private static final Log LOG = LogFactory.getLog(BucketCache.class); -102 -103 /** Priority buckets */ -104 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; -105 private static final float DEFAULT_MULTI_FACTOR = 0.50f; -106 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; -107 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; -108 -109 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; -110 private static final float DEFAULT_MIN_FACTOR = 0.85f; -111 -112 // Number of blocks to clear for each of the bucket size that is full -113 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; -114 -115 /** Statistics thread */ -116 private static final int statThreadPeriod = 5 * 60; -117 -118 final static int DEFAULT_WRITER_THREADS = 3; -119 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; +091 * <p>Eviction is via a similar algorithm as used in +092 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} +093 * +094 * <p>BucketCache can be used as mainly a block cache (see +095 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with +096 * LruBlockCache to decrease CMS GC and heap fragmentation. +097 * +098 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store +099 * blocks) to enlarge cache space via +100 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} +101 */ +102@InterfaceAudience.Private +103public class BucketCache implements BlockCache, HeapSize { +104 private static final Log LOG = LogFactory.getLog(BucketCache.class); +105 +106 /** Priority buckets config */ +107 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; +108 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; +109 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; +110 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; +111 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; +112 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; +113 +114 /** Priority buckets */ +115 @VisibleForTesting +116 static final float DEFAULT_SINGLE_FACTOR = 0.25f; +117 static final float DEFAULT_MULTI_FACTOR = 0.50f; +118 static final float DEFAULT_MEMORY_FACTOR = 0.25f; +119 static final float DEFAULT_MIN_FACTOR = 0.85f; 120 -121 // Store/read block data -122 final IOEngine ioEngine; +121 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; +122 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 123 -124 // Store the block in this map before writing it to cache -125 @VisibleForTesting -126 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache; -127 // In this map, store the block's meta data like offset, length -128 @VisibleForTesting -129 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap; -130 -131 /** -132 * Flag if the cache is enabled or not... We shut it off if there are IO -133 * errors for some time, so that Bucket IO exceptions/errors don't bring down -134 * the HBase server. -135 */ -136 private volatile boolean cacheEnabled; -137 -138 /** -139 * A list of writer queues. We have a queue per {@link WriterThread} we have running. -140 * In other words, the work adding blocks to the BucketCache is divided up amongst the -141 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. -142 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries -143 * to the BucketCache. It then updates the ramCache and backingMap accordingly. -144 */ -145 @VisibleForTesting -146 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); -147 @VisibleForTesting -148 final WriterThread[] writerThreads; +124 // Number of blocks to clear for each of the bucket size that is full +125 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; +126 +127 /** Statistics thread */ +128 private static final int statThreadPeriod = 5 * 60; +129 +130 final static int DEFAULT_WRITER_THREADS = 3; +131 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; +132 +133 // Store/read block data +134 final IOEngine ioEngine; +135 +136 // Store the block in this map before writing it to cache +137 @VisibleForTesting +138 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache; +139 // In this map, store the block's meta data like offset, length +140 @VisibleForTesting +141 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap; +142 +143 /** +144 * Flag if the cache is enabled or not... We shut it off if there are IO +145 * errors for some time, so that Bucket IO exceptions/errors don't bring down +146 * the HBase server. +147 */ +148 private volatile boolean cacheEnabled; 149 -150 /** Volatile boolean to track if free space is in process or not */ -151 private volatile boolean freeInProgress = false; -152 private final Lock freeSpaceLock = new ReentrantLock(); -153 -154 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); -155 -156 private final AtomicLong realCacheSize = new AtomicLong(0); -157 private final AtomicLong heapSize = new AtomicLong(0); -158 /** Current number of cached elements */ -159 private final AtomicLong blockNumber = new AtomicLong(0); -160 -161 /** Cache access count (sequential ID) */ -162 private final AtomicLong accessCount = new AtomicLong(0); -163 -164 private static final int DEFAULT_CACHE_WAIT_TIME = 50; -165 // Used in test now. If the flag is false and the cache speed is very fast, -166 // bucket cache will skip some blocks when caching. If the flag is true, we -167 // will wait blocks flushed to IOEngine for some time when caching -168 boolean wait_when_cache = false; -169 -170 private final BucketCacheStats cacheStats = new BucketCacheStats(); -171 -172 private final String persistencePath; -173 private final long cacheCapacity; -174 /** Approximate block size */ -175 private final long blockSize; -176 -177 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ -178 private final int ioErrorsTolerationDuration; -179 // 1 min -180 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; +150 /** +151 * A list of writer queues. We have a queue per {@link WriterThread} we have running. +152 * In other words, the work adding blocks to the BucketCache is divided up amongst the +153 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. +154 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries +155 * to the BucketCache. It then updates the ramCache and backingMap accordingly. +156 */ +157 @VisibleForTesting +158 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); +159 @VisibleForTesting +160 final WriterThread[] writerThreads; +161 +162 /** Volatile boolean to track if free space is in process or not */ +163 private volatile boolean freeInProgress = false; +164 private final Lock freeSpaceLock = new ReentrantLock(); +165 +166 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); +167 +168 private final AtomicLong realCacheSize = new AtomicLong(0); +169 private final AtomicLong heapSize = new AtomicLong(0); +170 /** Current number of cached elements */ +171 private final AtomicLong blockNumber = new AtomicLong(0); +172 +173 /** Cache access count (sequential ID) */ +174 private final AtomicLong accessCount = new AtomicLong(0); +175 +176 private static final int DEFAULT_CACHE_WAIT_TIME = 50; +177 // Used in test now. If the flag is false and the cache speed is very fast, +178 // bucket cache will skip some blocks when caching. If the flag is true, we +179 // will wait blocks flushed to IOEngine for some time when caching +180 boolean wait_when_cache = false; 181 -182 // Start time of first IO error when reading or writing IO Engine, it will be -183 // reset after a successful read/write. -184 private volatile long ioErrorStartTime = -1; -185 -186 /** -187 * A ReentrantReadWriteLock to lock on a particular block identified by offset. -188 * The purpose of this is to avoid freeing the block which is being read. -189 * <p> -190 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. -191 */ -192 @VisibleForTesting -193 final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT); -194 -195 private final NavigableSet<BlockCacheKey> blocksByHFile = -196 new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() { -197 @Override -198 public int compare(BlockCacheKey a, BlockCacheKey b) { -199 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); -200 if (nameComparison != 0) { -201 return nameComparison; -202 } -203 -204 if (a.getOffset() == b.getOffset()) { -205 return 0; -206 } else if (a.getOffset() < b.getOffset()) { -207 return -1; -208 } -209 return 1; -210 } -211 }); -212 -213 /** Statistics thread schedule pool (for heavy debugging, could remove) */ -214 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, -215 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); -216 -217 // Allocate or free space for the block -218 private BucketAllocator bucketAllocator; -219 -220 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, -221 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, -222 IOException { -223 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, -224 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); -225 } -226 -227 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, -228 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) -229 throws FileNotFoundException, IOException { -230 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); -231 this.writerThreads = new WriterThread[writerThreadNum]; -232 long blockNumCapacity = capacity / blockSize; -233 if (blockNumCapacity >= Integer.MAX_VALUE) { -234 // Enough for about 32TB of cache! -235 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); -236 } +182 private final BucketCacheStats cacheStats = new BucketCacheStats(); +183 +184 private final String persistencePath; +185 private final long cacheCapacity; +186 /** Approximate block size */ +187 private final long blockSize; +188 +189 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ +190 private final int ioErrorsTolerationDuration; +191 // 1 min +192 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; +193 +194 // Start time of first IO error when reading or writing IO Engine, it will be +195 // reset after a successful read/write. +196 private volatile long ioErrorStartTime = -1; +197 +198 /** +199 * A ReentrantReadWriteLock to lock on a particular block identified by offset. +200 * The purpose of this is to avoid freeing the block which is being read. +201 * <p> +202 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. +203 */ +204 @VisibleForTesting +205 final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT); +206 +207 private final NavigableSet<BlockCacheKey> blocksByHFile = +208 new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() { +209 @Override +210 public int compare(BlockCacheKey a, BlockCacheKey b) { +211 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); +212 if (nameComparison != 0) { +213 return nameComparison; +214 } +215 +216 if (a.getOffset() == b.getOffset()) { +217 return 0; +218 } else if (a.getOffset() < b.getOffset()) { +219 return -1; +220 } +221 return 1; +222 } +223 }); +224 +225 /** Statistics thread schedule pool (for heavy debugging, could remove) */ +226 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, +227 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); +228 +229 // Allocate or free space for the block +230 private BucketAllocator bucketAllocator; +231 +232 /** Acceptable size of cache (no evictions if size < acceptable) */ +233 private float acceptableFactor; +234 +235 /** Minimum threshold of cache (when evicting, evict until size < min) */ +236 private float minFactor; 237 -238 this.cacheCapacity = capacity; -239 this.persistencePath = persistencePath; -240 this.blockSize = blockSize; -241 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; -242 -243 bucketAllocator = new BucketAllocator(capacity, bucketSizes); -244 for (int i = 0; i < writerThreads.length; ++i) { -245 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); -246 } -247 -248 assert writerQueues.size() == writerThreads.length; -249 this.ramCache = new ConcurrentHashMap<>(); -250 -251 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); -252 -253 if (ioEngine.isPersistent() && persistencePath != null) { -254 try { -255 retrieveFromFile(bucketSizes); -256 } catch (IOException ioex) { -257 LOG.error("Can't restore from file because of", ioex); -258 } catch (ClassNotFoundException cnfe) { -259 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); -260 throw new RuntimeException(cnfe); -261 } -262 } -263 final String threadName = Thread.currentThread().getName(); -264 this.cacheEnabled = true; -265 for (int i = 0; i < writerThreads.length; ++i) { -266 writerThreads[i] = new WriterThread(writerQueues.get(i)); -267 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); -268 writerThreads[i].setDaemon(true); -269 } -270 startWriterThreads(); -271 -272 // Run the statistics thread periodically to print the cache statistics log -273 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log -274 // every five minutes. -275 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), -276 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); -277 LOG.info("Started bucket cache; ioengine=" + ioEngineName + -278 ", capacity=" + StringUtils.byteDesc(capacity) + -279 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + -280 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + -281 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); -282 } -283 -284 /** -285 * Called by the constructor to start the writer threads. Used by tests that need to override -286 * starting the threads. -287 */ -288 @VisibleForTesting -289 protected void startWriterThreads() { -290 for (WriterThread thread : writerThreads) { -291 thread.start(); -292 } -293 } +238 /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ +239 private float extraFreeFactor; +240 +241 /** Single access bucket size */ +242 private float singleFactor; +243 +244 /** Multiple access bucket size */ +245 private float multiFactor; +246 +247 /** In-memory bucket size */ +248 private float memoryFactor; +249 +250 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, +251 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, +252 IOException { +253 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, +254 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); +255 } +256 +257 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, +258 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, +259 Configuration conf) +260 throws FileNotFoundException, IOException { +261 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); +262 this.writerThreads = new WriterThread[writerThreadNum]; +263 long blockNumCapacity = capacity / blockSize; +264 if (blockNumCapacity >= Integer.MAX_VALUE) { +265 // Enough for about 32TB of cache! +266 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); +267 } +268 +269 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); +270 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); +271 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); +272 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); +273 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); +274 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); +275 +276 sanityCheckConfigs(); +277 +278 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + +279 ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + +280 ", memoryFactor: " + memoryFactor); +281 +282 this.cacheCapacity = capacity; +283 this.persistencePath = persistencePath; +284 this.blockSize = blockSize; +285 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; +286 +287 bucketAllocator = new BucketAllocator(capacity, bucketSizes); +288 for (int i = 0; i < writerThreads.length; ++i) { +289 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); +290 } +291 +292 assert writerQueues.size() == writerThreads.length; +293 this.ramCache = new ConcurrentHashMap<>(); 294 -295 @VisibleForTesting -296 boolean isCacheEnabled() { -297 return this.cacheEnabled; -298 } -299 -300 public long getMaxSize() { -301 return this.cacheCapacity; -302 } -303 -304 public String getIoEngine() { -305 return ioEngine.toString(); -306 } -307 -308 /** -309 * Get the IOEngine from the IO engine name -310 * @param ioEngineName -311 * @param capacity -312 * @param persistencePath -313 * @return the IOEngine -314 * @throws IOException -315 */ -316 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) -317 throws IOException { -318 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { -319 // In order to make the usage simple, we only need the prefix 'files:' in -320 // document whether one or multiple file(s), but also support 'file:' for -321 // the compatibility -322 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) -323 .split(FileIOEngine.FILE_DELIMITER); -324 return new FileIOEngine(capacity, persistencePath != null, filePaths); -325 } else if (ioEngineName.startsWith("offheap")) { -326 return new ByteBufferIOEngine(capacity, true); -327 } else if (ioEngineName.startsWith("heap")) { -328 return new ByteBufferIOEngine(capacity, false); -329 } else if (ioEngineName.startsWith("mmap:")) { -330 return new FileMmapEngine(ioEngineName.substring(5), capacity); -331 } else { -332 throw new IllegalArgumentException( -333 "Don't understand io engine name for cache - prefix with file:, heap or offheap"); -334 } -335 } -336 -337 /** -338 * Cache the block with the specified name and buffer. -339 * @param cacheKey block's cache key -340 * @param buf block buffer -341 */ -342 @Override -343 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { -344 cacheBlock(cacheKey, buf, false, false); -345 } -346 -347 /** -348 * Cache the block with the specified name and buffer. -349 * @param cacheKey block's cache key -350 * @param cachedItem block buffer -351 * @param inMemory if block is in-memory -352 * @param cacheDataInL1 -353 */ -354 @Override -355 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, -356 final boolean cacheDataInL1) { -357 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); +295 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); +296 +297 if (ioEngine.isPersistent() && persistencePath != null) { +298 try { +299 retrieveFromFile(bucketSizes); +300 } catch (IOException ioex) { +301 LOG.error("Can't restore from file because of", ioex); +302 } catch (ClassNotFoundException cnfe) { +303 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); +304 throw new RuntimeException(cnfe); +305 } +306 } +307 final String threadName = Thread.currentThread().getName(); +308 this.cacheEnabled = true; +309 for (int i = 0; i < writerThreads.length; ++i) { +310 writerThreads[i] = new WriterThread(writerQueues.get(i)); +311 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); +312 writerThreads[i].setDaemon(true); +313 } +314 startWriterThreads(); +315 +316 // Run the statistics thread periodically to print the cache statistics log +317 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log +318 // every five minutes. +319 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), +320 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); +321 LOG.info("Started bucket cache; ioengine=" + ioEngineName + +322 ", capacity=" + StringUtils.byteDesc(capacity) + +323 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + +324 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + +325 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); +326 } +327 +328 private void sanityCheckConfigs() { +329 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); +330 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); +331 Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); +332 Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); +333 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); +334 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); +335 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); +336 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + +337 MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); +338 } +339 +340 /** +341 * Called by the constructor to start the writer threads. Used by tests that need to override +342 * starting the threads. +343 */ +344 @VisibleForTesting +345 protected void startWriterThreads() { +346 for (WriterThread thread : writerThreads) { +347 thread.start(); +348 } +349 } +350 +351 @VisibleForTesting +352 boolean isCacheEnabled() { +353 return this.cacheEnabled; +354 } +355 +356 public long getMaxSize() { +357 return this.cacheCapacity; 358 } 359 -360 /** -361 * Cache the block to ramCache -362 * @param cacheKey block's cache key -363 * @param cachedItem block buffer -364 * @param inMemory if block is in-memory -365 * @param wait if true, blocking wait when queue is full -366 */ -367 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, -368 boolean wait) { -369 if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem); -370 if (!cacheEnabled) { -371 return; -372 } -373 -374 if (backingMap.containsKey(cacheKey)) { -375 return; -376 } -377 -378 /* -379 * Stuff the entry into the RAM cache so it can get drained to the persistent store -380 */ -381 RAMQueueEntry re = -382 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); -383 if (ramCache.putIfAbsent(cacheKey, re) != null) { -384 return; -385 } -386 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); -387 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); -388 boolean successfulAddition = false; -389 if (wait) { -390 try { -391 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); -392 } catch (InterruptedException e) { -393 Thread.currentThread().interrupt(); -394 } -395 } else { -396 successfulAddition = bq.offer(re); -397 } -398 if (!successfulAddition) { -399 ramCache.remove(cacheKey); -400 cacheStats.failInsert(); -401 } else { -402 this.blockNumber.incrementAndGet(); -403 this.heapSize.addAndGet(cachedItem.heapSize()); -404 blocksByHFile.add(cacheKey); -405 } -406 } -407 -408 /** -409 * Get the buffer of the block with the specified key. -410 * @param key block's cache key -411 * @param caching true if the caller caches blocks on cache misses -412 * @param repeat Whether this is a repeat lookup for the same block -413 * @param updateCacheMetrics Whether we should update cache metrics or not -414 * @return buffer of specified cache key, or null if not in cache -415 */ -416 @Override -417 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, -418 boolean updateCacheMetrics) { -419 if (!cacheEnabled) { -420 return null; -421 } -422 RAMQueueEntry re = ramCache.get(key); -423 if (re != null) { -424 if (updateCacheMetrics) { -425 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); -426 } -427 re.access(accessCount.incrementAndGet()); -428 return re.getData(); -429 } -430 BucketEntry bucketEntry = backingMap.get(key); -431 if (bucketEntry != null) { -432 long start = System.nanoTime(); -433 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); -434 try { -435 lock.readLock().lock(); -436 // We can not read here even if backingMap does contain the given key because its offset -437 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check -438 // existence here. -439 if (bucketEntry.equals(backingMap.get(key))) { -440 // TODO : change this area - should be removed after server cells and -441 // 12295 are available -442 int len = bucketEntry.getLength(); -443 if (LOG.isTraceEnabled()) { -444 LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); -445 } -446 Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, -447 bucketEntry.deserializerReference(this.deserialiserMap)); -448 long timeTaken = System.nanoTime() - start; -449 if (updateCacheMetrics) { -450 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); -451 cacheStats.ioHit(timeTaken); -452 } -453 if (cachedBlock.getMemoryType() == MemoryType.SHARED) { -454 bucketEntry.refCount.incrementAndGet(); -455 } -456 bucketEntry.access(accessCount.incrementAndGet()); -457 if (this.ioErrorStartTime > 0) { -458 ioErrorStartTime = -1; -459 } -460 return cachedBlock; -461 } -462 } catch (IOException ioex) { -463 LOG.error("Failed reading block " + key + " from bucket cache", ioex); -464 checkIOErrorIsTolerated(); -465 } finally { -466 lock.readLock().unlock(); -467 } -468 } -469 if (!repeat && updateCacheMetrics) { -470 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); -471 } -472 return null; -473 } -474 -475 @VisibleForTesting -476 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { -477 bucketAllocator.freeBlock(bucketEntry.offset()); -478 realCacheSize.addAndGet(-1 * bucketEntry.getLength()); -479 blocksByHFile.remove(cacheKey); -480 if (decrementBlockNumber) { -481 this.blockNumber.decrementAndGet(); -482 } -483 } -484 -485 @Override -486 public boolean evictBlock(BlockCacheKey cacheKey) { -487 return evictBlock(cacheKey, true); -488 } -489 -490 // does not check for the ref count. Just tries to evict it if found in the -491 // bucket map -492 private boolean forceEvict(BlockCacheKey cacheKey) { -493 if (!cacheEnabled) { -494 return false; -495 } -496 RAMQueueEntry removedBlock = checkRamCache(cacheKey); -497 BucketEntry bucketEntry = backingMap.get(cacheKey); -498 if (bucketEntry == null) { -499 if (removedBlock != null) { -500 cacheStats.evicted(0, cacheKey.isPrimary()); -501 return true; -502 } else { -503 return false; -504 } -505 } -506 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); -507 try { -508 lock.writeLock().lock(); -509 if (backingMap.remove(cacheKey, bucketEntry)) { -510 blockEvicted(cacheKey, bucketEntry, removedBlock == null); -511 } else { -512 return false; -513 } -514 } finally { -515 lock.writeLock().unlock(); -516 } -517 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); -518 return true; -519 } -520 -521 private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { -522 RAMQueueEntry removedBlock = ramCache.remove(cacheKey); -523 if (removedBlock != null) { -524 this.blockNumber.decrementAndGet(); -525 this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); -526 } -527 return removedBlock; -528 } -529 -530 public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { -531 if (!cacheEnabled) { -532 return false; -533 } -534 RAMQueueEntry removedBlock = checkRamCache(cacheKey); -535 BucketEntry bucketEntry = backingMap.get(cacheKey); -536 if (bucketEntry == null) { -537 if (removedBlock != null) { -538 cacheStats.evicted(0, cacheKey.isPrimary()); -539 return true; -540 } else { -541 return false; -542 } -543 } -544 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); -545 try { -546 lock.writeLock().lock(); -547 int refCount = bucketEntry.refCount.get(); -548 if(refCount == 0) { -549 if (backingMap.remove(cacheKey, bucketEntry)) { -550 blockEvicted(cacheKey, bucketEntry, removedBlock == null); -551 } else { -552 return false; -553 } -554 } else { -555 if(!deletedBlock) { -556 if (LOG.isDebugEnabled()) { -557 LOG.debug("This block " + cacheKey + " is still referred by " + refCount -558 + " readers. Can not be freed now"); -559 } -560 return false; -561 } else { -562 if (LOG.isDebugEnabled()) { -563 LOG.debug("This block " + cacheKey + " is still referred by " + refCount -564 + " readers. Can not be freed now. Hence will mark this" -565 + " for evicting at a later point"); -566 } -567 bucketEntry.markedForEvict = true; -568 } +360 public String getIoEngine() { +361 return ioEngine.toString(); +362 } +363 +364 /** +365 * Get the IOEngine from the IO engine name +366 * @param ioEngineName +367 * @param capacity +368 * @param persistencePath +369 * @return the IOEngine +370 * @throws IOException +371 */ +372 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) +373 throws IOException { +374 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { +375 // In order to make the usage simple, we only need the prefix 'files:' in +376 // document whether one or multiple file(s), but also support 'file:' for +377 // the compatibility +378 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) +379 .split(FileIOEngine.FILE_DELIMITER); +380 return new FileIOEngine(capacity, persistencePath != null, filePaths); +381 } else if (ioEngineName.startsWith("offheap")) { +382 return new ByteBufferIOEngine(capacity, true); +383 } else if (ioEngineName.startsWith("heap")) { +384 return new ByteBufferIOEngine(capacity, false); +385 } else if (ioEngineName.startsWith("mmap:")) { +386 return new FileMmapEngine(ioEngineName.substring(5), capacity); +387 } else { +388 throw new IllegalArgumentException( +389 "Don't understand io engine name for cache - prefix with file:, heap or offheap"); +390 } +391 } +392 +393 /** +394 * Cache the block with the specified name and buffer. +395 * @param cacheKey block's cache key +396 * @param buf block buffer +397 */ +398 @Override +399 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { +400 cacheBlock(cacheKey, buf, false, false); +401 } +402 +403 /** +404 * Cache the block with the specified name and buffer. +405 * @param cacheKey block's cache key +406 * @param cachedItem block buffer +407 * @param inMemory if block is in-memory +408 * @param cacheDataInL1 +409 */ +410 @Override +411 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, +412 final boolean cacheDataInL1) { +