accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject [2/2] accumulo git commit: ACCUMULO-4463: changes from review
Date Mon, 08 May 2017 19:24:08 GMT
ACCUMULO-4463: changes from review


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/40c1cb0b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/40c1cb0b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/40c1cb0b

Branch: refs/heads/ACCUMULO-4463
Commit: 40c1cb0b6b3b466aad5ba835344bbc7edccae7dc
Parents: 03a8f2a
Author: Dave Marion <dlmarion@apache.org>
Authored: Mon May 8 15:23:35 2017 -0400
Committer: Dave Marion <dlmarion@apache.org>
Committed: Mon May 8 15:23:35 2017 -0400

----------------------------------------------------------------------
 .../core/client/rfile/RFileScanner.java         |  33 +-
 .../org/apache/accumulo/core/conf/Property.java |   4 +-
 .../core/file/blockfile/cache/BlockCache.java   |  11 +-
 .../cache/BlockCacheConfiguration.java          |  82 ++
 .../file/blockfile/cache/BlockCacheFactory.java |   7 +-
 .../core/file/blockfile/cache/CachedBlock.java  |   4 +-
 .../file/blockfile/cache/LruBlockCache.java     | 746 -------------------
 .../file/blockfile/cache/TinyLfuBlockCache.java | 144 ----
 .../file/blockfile/cache/lru/LruBlockCache.java | 637 ++++++++++++++++
 .../cache/lru/LruBlockCacheConfiguration.java   | 122 +++
 .../cache/lru/LruBlockCacheFactory.java         |  31 +
 .../cache/tinylfu/TinyLfuBlockCache.java        | 149 ++++
 .../tinylfu/TinyLfuBlockCacheConfiguration.java |  29 +
 .../cache/tinylfu/TinyLfuBlockCacheFactory.java |  31 +
 .../accumulo/core/summary/SummaryReader.java    |   2 +-
 .../file/blockfile/cache/TestLruBlockCache.java | 181 ++---
 .../accumulo/core/file/rfile/RFileTest.java     |  20 +-
 .../tserver/TabletServerResourceManager.java    |  26 +-
 18 files changed, 1219 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 57322a8..b4a6d14 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -44,7 +44,8 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
@@ -56,6 +57,8 @@ import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.commons.configuration.ConfigurationMap;
+import org.apache.commons.configuration.MapConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.io.Text;
 
@@ -129,7 +132,7 @@ class RFileScanner extends ScannerOptions implements Scanner {
     }
 
     @Override
-    public void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception {}
+    public void start() {}
 
     @Override
     public void stop() {}
@@ -142,23 +145,25 @@ class RFileScanner extends ScannerOptions implements Scanner {
 
     this.opts = opts;
     if (opts.indexCacheSize > 0) {
-      this.indexCache = new LruBlockCache();
-      try {
-        this.indexCache.start((AccumuloConfiguration) null, opts.indexCacheSize, CACHE_BLOCK_SIZE);
-      } catch (Exception e) {
-        throw new RuntimeException("Error starting cache", e);
-      }
+      MapConfiguration config = new MapConfiguration(new HashMap<String,String>());
+      config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.indexCacheSize));
+      config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE));
+      @SuppressWarnings("unchecked")
+      ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config));
+      this.indexCache = new LruBlockCache(new LruBlockCacheConfiguration(copy));
+      this.indexCache.start();
     } else {
       this.indexCache = new NoopCache();
     }
 
     if (opts.dataCacheSize > 0) {
-      this.dataCache = new LruBlockCache();
-      try {
-        this.dataCache.start((AccumuloConfiguration) null, opts.dataCacheSize, CACHE_BLOCK_SIZE);
-      } catch (Exception e) {
-        throw new RuntimeException("Error starting cache", e);
-      }
+      MapConfiguration config = new MapConfiguration(new HashMap<String,String>());
+      config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.dataCacheSize));
+      config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE));
+      @SuppressWarnings("unchecked")
+      ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config));
+      this.dataCache = new LruBlockCache(new LruBlockCacheConfiguration(copy));
+      this.dataCache.start();
     } else {
       this.dataCache = new NoopCache();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 0bbaf10..d0f7ce2 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -245,8 +245,8 @@ public enum Property {
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"),
   TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before closing a session."),
   TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"),
-  TSERV_CACHE_IMPL("tserver.cache.class", "org.apache.accumulo.core.file.blockfile.cache.LruBlockCache.class", PropertyType.STRING,
-      "Specifies the class name of the block cache implementation."),
+  TSERV_CACHE_IMPL("tserver.cache.factory.class", "org.apache.accumulo.core.file.blockfile.cache.lru.LRUBlockCacheFactory.class", PropertyType.STRING,
+      "Specifies the class name of the block cache factory implementation."),
   TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for file data blocks."),
   TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", PropertyType.MEMORY, "Specifies the size of the cache for file indices."),
   TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for summary data on each tablet server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index c5a17e4..f035f5d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -17,8 +17,6 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-
 /**
  * Block cache interface.
  */
@@ -26,15 +24,8 @@ public interface BlockCache {
 
   /**
    * Start the block cache
-   *
-   * @param conf
-   *          Accumulo configuration object
-   * @param maxSize
-   *          maximum size of the on-heap cache
-   * @param blockSize
-   *          size of the default RFile blocks
    */
-  void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception;
+  void start();
 
   /**
    * Stop the block cache and release resources

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
new file mode 100644
index 0000000..e3ccbf5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+
+public class BlockCacheConfiguration {
+
+  public static final String MAX_SIZE_PROPERTY = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block." + "max.size";
+  public static final String BLOCK_SIZE_PROPERTY = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block." + "block.size";
+
+  private static final Long DEFAULT = Long.valueOf(-1);
+
+  /** Maximum allowable size of cache (block put if size > max, evict) */
+  private final long maxSize;
+
+  /** Approximate block size */
+  private final long blockSize;
+
+  public BlockCacheConfiguration(AccumuloConfiguration conf) {
+    Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX);
+    this.maxSize = getOrDefault(props, MAX_SIZE_PROPERTY, DEFAULT);
+    this.blockSize = getOrDefault(props, BLOCK_SIZE_PROPERTY, DEFAULT);
+
+    if (DEFAULT.equals(this.maxSize)) {
+      throw new IllegalArgumentException("Block cache max size must be specified.");
+    }
+    if (DEFAULT.equals(this.blockSize)) {
+      throw new IllegalArgumentException("Block cache block size must be specified.");
+    }
+  }
+
+  public long getMaxSize() {
+    return maxSize;
+  }
+
+  public long getBlockSize() {
+    return blockSize;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected <T> T getOrDefault(Map<String,String> props, String propertyName, T defaultValue) {
+    String o = props.get(propertyName);
+    if (null == o && defaultValue == null) {
+      throw new RuntimeException("Property " + propertyName + " not specified and no default supplied.");
+    } else if (null == o) {
+      return defaultValue;
+    } else {
+      if (defaultValue.getClass().equals(Integer.class)) {
+        return (T) Integer.valueOf(o);
+      } else if (defaultValue.getClass().equals(Long.class)) {
+        return (T) Long.valueOf(o);
+      } else if (defaultValue.getClass().equals(Float.class)) {
+        return (T) Float.valueOf(o);
+      } else if (defaultValue.getClass().equals(Boolean.class)) {
+        return (T) Boolean.valueOf(o);
+      } else {
+        throw new RuntimeException("Unknown parameter type");
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
index 70cbc7d..57ca5f1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
@@ -21,12 +21,13 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 
-public class BlockCacheFactory {
+public abstract class BlockCacheFactory {
 
-  public static BlockCache getBlockCache(AccumuloConfiguration conf) throws Exception {
+  public static BlockCacheFactory getBlockCacheFactory(AccumuloConfiguration conf) throws Exception {
     String impl = conf.get(Property.TSERV_CACHE_IMPL);
-    Class<? extends BlockCache> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCache.class);
+    Class<? extends BlockCacheFactory> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheFactory.class);
     return clazz.newInstance();
   }
 
+  public abstract BlockCache getBlockCache(AccumuloConfiguration conf);
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
index c67b4c7..b04b77a 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.core.file.blockfile.cache;
 
 import java.util.Objects;
 
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
+
 /**
  * Represents an entry in the {@link LruBlockCache}.
  *
@@ -31,7 +33,7 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
   public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG)
       + ClassSize.STRING + ClassSize.BYTE_BUFFER);
 
-  static enum BlockPriority {
+  public static enum BlockPriority {
     /**
      * Accessed a single time (used for scan-resistance)
      */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
deleted file mode 100644
index 921b5a5..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
+++ /dev/null
@@ -1,746 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile.cache;
-
-import java.lang.ref.WeakReference;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a
- * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations.
- *
- * <p>
- * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a
- * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the
- * cache, adding a least-frequently-used element to the eviction algorithm.
- *
- * <p>
- * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if
- * any priority is not using its entire chunk the others are able to grow beyond their chunk size.
- *
- * <p>
- * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is
- * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map.
- *
- * <p>
- * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and
- * control the eviction thread.
- *
- * <p>
- * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size
- * specified.
- *
- * <p>
- * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and
- * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
- * uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
- */
-public class LruBlockCache implements BlockCache, HeapSize {
-
-  private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class);
-
-  /** Default Configuration Parameters */
-
-  /** Backing Concurrent Map Configuration */
-  static final float DEFAULT_LOAD_FACTOR = 0.75f;
-  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
-
-  /** Eviction thresholds */
-  static final float DEFAULT_MIN_FACTOR = 0.75f;
-  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
-
-  /** Priority buckets */
-  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
-  static final float DEFAULT_MULTI_FACTOR = 0.50f;
-  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
-
-  /** Statistics thread */
-  static final int statThreadPeriod = 60;
-
-  /** Concurrent map (the cache) */
-  private ConcurrentHashMap<String,CachedBlock> map;
-
-  /** Eviction lock (locked when eviction in process) */
-  private final ReentrantLock evictionLock = new ReentrantLock(true);
-
-  /** Volatile boolean to track if we are in an eviction process or not */
-  private volatile boolean evictionInProgress = false;
-
-  /** Eviction thread */
-  private EvictionThread evictionThread;
-
-  /** Statistics thread schedule pool (for heavy debugging, could remove) */
-  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));
-
-  /** Current size of cache */
-  private AtomicLong size;
-
-  /** Current number of cached elements */
-  private AtomicLong elements;
-
-  /** Cache access count (sequential ID) */
-  private AtomicLong count;
-
-  /** Cache statistics */
-  private CacheStats stats;
-
-  /** Maximum allowable size of cache (block put if size > max, evict) */
-  private long maxSize;
-
-  /** Approximate block size */
-  private long blockSize;
-
-  /** Acceptable size of cache (no evictions if size < acceptable) */
-  private float acceptableFactor = DEFAULT_ACCEPTABLE_FACTOR;
-
-  /** Minimum threshold of cache (when evicting, evict until size < min) */
-  private float minFactor = DEFAULT_MIN_FACTOR;
-
-  /** Single access bucket size */
-  private float singleFactor = DEFAULT_SINGLE_FACTOR;
-
-  /** Multiple access bucket size */
-  private float multiFactor = DEFAULT_MULTI_FACTOR;
-
-  /** In-memory bucket size */
-  private float memoryFactor = DEFAULT_MEMORY_FACTOR;
-
-  /** LruBlockCache cache = new LruBlockCache **/
-  private float mapLoadFactor = DEFAULT_LOAD_FACTOR;
-
-  /** LruBlockCache cache = new LruBlockCache **/
-  private int mapConcurrencyLevel = DEFAULT_CONCURRENCY_LEVEL;
-
-  /** Overhead of the structure itself */
-  private long overhead;
-
-  private boolean useEvictionThread = true;
-
-  /**
-   * Default constructor. Specify maximum size and expected average block size (approximation is fine).
-   *
-   * <p>
-   * All other factors will be calculated based on defaults specified in this class.
-   *
-   * @param conf
-   *          accumulo configuration
-   * @param maxSize
-   *          maximum size of cache, in bytes
-   * @param blockSize
-   *          approximate size of each block, in bytes
-   */
-  public void start(AccumuloConfiguration conf, long maxSize, long blockSize) {
-    int mapInitialSize = (int) Math.ceil(1.2 * maxSize / blockSize);
-
-    if (singleFactor + multiFactor + memoryFactor != 1) {
-      throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0");
-    }
-    if (minFactor >= acceptableFactor) {
-      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
-    }
-    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
-      throw new IllegalArgumentException("all factors must be < 1");
-    }
-    this.maxSize = maxSize;
-    this.blockSize = blockSize;
-    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
-    this.stats = new CacheStats();
-    this.count = new AtomicLong(0);
-    this.elements = new AtomicLong(0);
-    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
-    this.size = new AtomicLong(this.overhead);
-
-    if (useEvictionThread) {
-      this.evictionThread = new EvictionThread(this);
-      this.evictionThread.start();
-      while (!this.evictionThread.running()) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException ex) {
-          throw new RuntimeException(ex);
-        }
-      }
-    } else {
-      this.evictionThread = null;
-    }
-    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
-  }
-
-  public void stop() {}
-
-  public float getMinFactor() {
-    return minFactor;
-  }
-
-  public float getSingleFactor() {
-    return singleFactor;
-  }
-
-  public float getMultiFactor() {
-    return multiFactor;
-  }
-
-  public float getMemoryFactor() {
-    return memoryFactor;
-  }
-
-  public float getMapLoadFactor() {
-    return mapLoadFactor;
-  }
-
-  public int getMapConcurrencyLevel() {
-    return mapConcurrencyLevel;
-  }
-
-  public long getOverhead() {
-    return overhead;
-  }
-
-  public boolean isUseEvictionThread() {
-    return useEvictionThread;
-  }
-
-  public void setMinFactor(float minFactor) {
-    this.minFactor = minFactor;
-  }
-
-  public void setSingleFactor(float singleFactor) {
-    this.singleFactor = singleFactor;
-  }
-
-  public void setMultiFactor(float multiFactor) {
-    this.multiFactor = multiFactor;
-  }
-
-  public void setMemoryFactor(float memoryFactor) {
-    this.memoryFactor = memoryFactor;
-  }
-
-  public void setMapLoadFactor(float mapLoadFactor) {
-    this.mapLoadFactor = mapLoadFactor;
-  }
-
-  public void setMapConcurrencyLevel(int mapConcurrencyLevel) {
-    this.mapConcurrencyLevel = mapConcurrencyLevel;
-  }
-
-  public void setOverhead(long overhead) {
-    this.overhead = overhead;
-  }
-
-  public void setUseEvictionThread(boolean useEvictionThread) {
-    this.useEvictionThread = useEvictionThread;
-  }
-
-  public float getAcceptableFactor() {
-    return acceptableFactor;
-  }
-
-  public void setAcceptableFactor(float acceptableFactor) {
-    this.acceptableFactor = acceptableFactor;
-  }
-
-  public void setMaxSize(long maxSize) {
-    this.maxSize = maxSize;
-    if (this.size.get() > acceptableSize() && !evictionInProgress) {
-      runEviction();
-    }
-  }
-
-  // BlockCache implementation
-
-  /**
-   * Cache the block with the specified name and buffer.
-   * <p>
-   * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
-   * race condition and will update the buffer but not modify the size of the cache.
-   *
-   * @param blockName
-   *          block name
-   * @param buf
-   *          block buffer
-   * @param inMemory
-   *          if block is in-memory
-   */
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
-    CachedBlock cb = map.get(blockName);
-    if (cb != null) {
-      stats.duplicateReads();
-      cb.access(count.incrementAndGet());
-    } else {
-      cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
-      CachedBlock currCb = map.putIfAbsent(blockName, cb);
-      if (currCb != null) {
-        stats.duplicateReads();
-        cb = currCb;
-        cb.access(count.incrementAndGet());
-      } else {
-        // Actually added block to cache
-        long newSize = size.addAndGet(cb.heapSize());
-        elements.incrementAndGet();
-        if (newSize > acceptableSize() && !evictionInProgress) {
-          runEviction();
-        }
-      }
-    }
-
-    return cb;
-  }
-
-  /**
-   * Cache the block with the specified name and buffer.
-   * <p>
-   * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
-   * race condition and will update the buffer but not modify the size of the cache.
-   *
-   * @param blockName
-   *          block name
-   * @param buf
-   *          block buffer
-   */
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte buf[]) {
-    return cacheBlock(blockName, buf, false);
-  }
-
-  /**
-   * Get the buffer of the block with the specified name.
-   *
-   * @param blockName
-   *          block name
-   * @return buffer of specified block name, or null if not in cache
-   */
-  @Override
-  public CachedBlock getBlock(String blockName) {
-    CachedBlock cb = map.get(blockName);
-    if (cb == null) {
-      stats.miss();
-      return null;
-    }
-    stats.hit();
-    cb.access(count.incrementAndGet());
-    return cb;
-  }
-
-  protected long evictBlock(CachedBlock block) {
-    map.remove(block.getName());
-    size.addAndGet(-1 * block.heapSize());
-    elements.decrementAndGet();
-    stats.evicted();
-    return block.heapSize();
-  }
-
-  /**
-   * Multi-threaded call to run the eviction process.
-   */
-  private void runEviction() {
-    if (evictionThread == null) {
-      evict();
-    } else {
-      evictionThread.evict();
-    }
-  }
-
-  /**
-   * Eviction method.
-   */
-  void evict() {
-
-    // Ensure only one eviction at a time
-    if (!evictionLock.tryLock())
-      return;
-
-    try {
-      evictionInProgress = true;
-
-      long bytesToFree = size.get() - minSize();
-
-      log.trace("Block cache LRU eviction started.  Attempting to free {} bytes", bytesToFree);
-
-      if (bytesToFree <= 0)
-        return;
-
-      // Instantiate priority buckets
-      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
-      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
-      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
-
-      // Scan entire map putting into appropriate buckets
-      for (CachedBlock cachedBlock : map.values()) {
-        switch (cachedBlock.getPriority()) {
-          case SINGLE: {
-            bucketSingle.add(cachedBlock);
-            break;
-          }
-          case MULTI: {
-            bucketMulti.add(cachedBlock);
-            break;
-          }
-          case MEMORY: {
-            bucketMemory.add(cachedBlock);
-            break;
-          }
-        }
-      }
-
-      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
-
-      bucketQueue.add(bucketSingle);
-      bucketQueue.add(bucketMulti);
-      bucketQueue.add(bucketMemory);
-
-      int remainingBuckets = 3;
-      long bytesFreed = 0;
-
-      BlockBucket bucket;
-      while ((bucket = bucketQueue.poll()) != null) {
-        long overflow = bucket.overflow();
-        if (overflow > 0) {
-          long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
-          bytesFreed += bucket.free(bucketBytesToFree);
-        }
-        remainingBuckets--;
-      }
-
-      float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024));
-      float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024));
-      float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024));
-
-      log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed,
-          singleMB, bucketSingle.totalSize(), multiMB, bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize());
-
-    } finally {
-      stats.evict();
-      evictionInProgress = false;
-      evictionLock.unlock();
-    }
-  }
-
-  /**
-   * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm
-   * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes.
-   */
-  private class BlockBucket implements Comparable<BlockBucket> {
-
-    private CachedBlockQueue queue;
-    private long totalSize = 0;
-    private long bucketSize;
-
-    public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
-      this.bucketSize = bucketSize;
-      queue = new CachedBlockQueue(bytesToFree, blockSize);
-      totalSize = 0;
-    }
-
-    public void add(CachedBlock block) {
-      totalSize += block.heapSize();
-      queue.add(block);
-    }
-
-    public long free(long toFree) {
-      CachedBlock[] blocks = queue.get();
-      long freedBytes = 0;
-      for (int i = 0; i < blocks.length; i++) {
-        freedBytes += evictBlock(blocks[i]);
-        if (freedBytes >= toFree) {
-          return freedBytes;
-        }
-      }
-      return freedBytes;
-    }
-
-    public long overflow() {
-      return totalSize - bucketSize;
-    }
-
-    public long totalSize() {
-      return totalSize;
-    }
-
-    @Override
-    public int compareTo(BlockBucket that) {
-      if (this.overflow() == that.overflow())
-        return 0;
-      return this.overflow() > that.overflow() ? 1 : -1;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(overflow());
-    }
-
-    @Override
-    public boolean equals(Object that) {
-      if (that instanceof BlockBucket)
-        return compareTo((BlockBucket) that) == 0;
-      return false;
-    }
-  }
-
-  @Override
-  public long getMaxSize() {
-    return this.maxSize;
-  }
-
-  /**
-   * Get the current size of this cache.
-   *
-   * @return current size in bytes
-   */
-  public long getCurrentSize() {
-    return this.size.get();
-  }
-
-  /**
-   * Get the current size of this cache.
-   *
-   * @return current size in bytes
-   */
-  public long getFreeSize() {
-    return getMaxSize() - getCurrentSize();
-  }
-
-  /**
-   * Get the size of this cache (number of cached blocks)
-   *
-   * @return number of cached blocks
-   */
-  public long size() {
-    return this.elements.get();
-  }
-
-  /**
-   * Get the number of eviction runs that have occurred
-   */
-  public long getEvictionCount() {
-    return this.stats.getEvictionCount();
-  }
-
-  /**
-   * Get the number of blocks that have been evicted during the lifetime of this cache.
-   */
-  public long getEvictedCount() {
-    return this.stats.getEvictedCount();
-  }
-
-  /**
-   * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level.
-   *
-   * <p>
-   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
-   */
-  private static class EvictionThread extends Thread {
-    private WeakReference<LruBlockCache> cache;
-    private boolean running = false;
-
-    public EvictionThread(LruBlockCache cache) {
-      super("LruBlockCache.EvictionThread");
-      setDaemon(true);
-      this.cache = new WeakReference<>(cache);
-    }
-
-    public synchronized boolean running() {
-      return running;
-    }
-
-    @Override
-    public void run() {
-      while (true) {
-        synchronized (this) {
-          running = true;
-          try {
-            this.wait();
-          } catch (InterruptedException e) {}
-        }
-        LruBlockCache cache = this.cache.get();
-        if (cache == null)
-          break;
-        cache.evict();
-      }
-    }
-
-    public void evict() {
-      synchronized (this) {
-        this.notify();
-      }
-    }
-  }
-
-  /*
-   * Statistics thread. Periodically prints the cache statistics to the log.
-   */
-  private static class StatisticsThread extends Thread {
-    LruBlockCache lru;
-
-    public StatisticsThread(LruBlockCache lru) {
-      super("LruBlockCache.StatisticsThread");
-      setDaemon(true);
-      this.lru = lru;
-    }
-
-    @Override
-    public void run() {
-      lru.logStats();
-    }
-  }
-
-  public void logStats() {
-    // Log size
-    long totalSize = heapSize();
-    long freeSize = maxSize - totalSize;
-    float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
-    float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
-    float maxMB = ((float) maxSize) / ((float) (1024 * 1024));
-    log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={},"
-        + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, maxSize, size(),
-        stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100,
-        stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads());
-  }
-
-  /**
-   * Get counter statistics for this cache.
-   *
-   * <p>
-   * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
-   */
-  public CacheStats getStats() {
-    return this.stats;
-  }
-
-  public static class CacheStats implements BlockCache.Stats {
-    private final AtomicLong accessCount = new AtomicLong(0);
-    private final AtomicLong hitCount = new AtomicLong(0);
-    private final AtomicLong missCount = new AtomicLong(0);
-    private final AtomicLong evictionCount = new AtomicLong(0);
-    private final AtomicLong evictedCount = new AtomicLong(0);
-    private final AtomicLong duplicateReads = new AtomicLong(0);
-
-    public void miss() {
-      missCount.incrementAndGet();
-      accessCount.incrementAndGet();
-    }
-
-    public void hit() {
-      hitCount.incrementAndGet();
-      accessCount.incrementAndGet();
-    }
-
-    public void evict() {
-      evictionCount.incrementAndGet();
-    }
-
-    public void duplicateReads() {
-      duplicateReads.incrementAndGet();
-    }
-
-    public void evicted() {
-      evictedCount.incrementAndGet();
-    }
-
-    @Override
-    public long requestCount() {
-      return accessCount.get();
-    }
-
-    public long getMissCount() {
-      return missCount.get();
-    }
-
-    @Override
-    public long hitCount() {
-      return hitCount.get();
-    }
-
-    public long getEvictionCount() {
-      return evictionCount.get();
-    }
-
-    public long getDuplicateReads() {
-      return duplicateReads.get();
-    }
-
-    public long getEvictedCount() {
-      return evictedCount.get();
-    }
-
-    public double getHitRatio() {
-      return ((float) hitCount() / (float) requestCount());
-    }
-
-    public double getMissRatio() {
-      return ((float) getMissCount() / (float) requestCount());
-    }
-
-    public double evictedPerEviction() {
-      return (float) getEvictedCount() / (float) getEvictionCount();
-    }
-  }
-
-  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
-      + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT);
-
-  // HeapSize implementation
-  @Override
-  public long heapSize() {
-    return getCurrentSize();
-  }
-
-  public static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
-    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
-        + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
-  }
-
-  // Simple calculators of sizes given factors and maxSize
-
-  private long acceptableSize() {
-    return (long) Math.floor(this.maxSize * this.acceptableFactor);
-  }
-
-  private long minSize() {
-    return (long) Math.floor(this.maxSize * this.minFactor);
-  }
-
-  private long singleSize() {
-    return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
-  }
-
-  private long multiSize() {
-    return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
-  }
-
-  private long memorySize() {
-    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
-  }
-
-  public void shutdown() {
-    this.scheduleThreadPool.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
deleted file mode 100644
index ef2f664..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile.cache;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.Policy;
-import com.github.benmanes.caffeine.cache.stats.CacheStats;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1)
- * read and write operations.
- * <ul>
- * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
- * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
- * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
- * </ul>
- */
-public final class TinyLfuBlockCache implements BlockCache {
-  private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class);
-  private static final int STATS_PERIOD_SEC = 60;
-
-  private Cache<String,Block> cache;
-  private Policy.Eviction<String,Block> policy;
-  private ScheduledExecutorService statsExecutor;
-
-  public void start(AccumuloConfiguration conf, long maxSize, long blockSize) {
-    cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * maxSize / blockSize)).weigher((String blockName, Block block) -> {
-      int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
-      return keyWeight + block.weight();
-    }).maximumWeight(maxSize).recordStats().build();
-    policy = cache.policy().eviction().get();
-
-    statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
-        .build());
-    statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS);
-  }
-
-  public void stop() {}
-
-  @Override
-  public long getMaxSize() {
-    return policy.getMaximum();
-  }
-
-  @Override
-  public CacheEntry getBlock(String blockName) {
-    return cache.getIfPresent(blockName);
-  }
-
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte[] buffer) {
-    return cache.asMap().compute(blockName, (key, block) -> {
-      if (block == null) {
-        return new Block(buffer);
-      }
-      block.buffer = buffer;
-      return block;
-    });
-  }
-
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) {
-    return cacheBlock(blockName, buffer);
-  }
-
-  @Override
-  public BlockCache.Stats getStats() {
-    CacheStats stats = cache.stats();
-    return new BlockCache.Stats() {
-      @Override
-      public long hitCount() {
-        return stats.hitCount();
-      }
-
-      @Override
-      public long requestCount() {
-        return stats.requestCount();
-      }
-    };
-  }
-
-  private void logStats() {
-    double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024));
-    double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024));
-    double freeMB = maxMB - sizeMB;
-    log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize());
-    log.debug(cache.stats().toString());
-  }
-
-  private static final class Block implements CacheEntry {
-    private volatile byte[] buffer;
-    private volatile Object index;
-
-    Block(byte[] buffer) {
-      this.buffer = requireNonNull(buffer);
-    }
-
-    @Override
-    public byte[] getBuffer() {
-      return buffer;
-    }
-
-    @Override
-    public Object getIndex() {
-      return index;
-    }
-
-    @Override
-    public void setIndex(Object index) {
-      this.index = index;
-    }
-
-    int weight() {
-      return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
new file mode 100644
index 0000000..349be7f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import java.lang.ref.WeakReference;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CachedBlock;
+import org.apache.accumulo.core.file.blockfile.cache.CachedBlockQueue;
+import org.apache.accumulo.core.file.blockfile.cache.ClassSize;
+import org.apache.accumulo.core.file.blockfile.cache.HeapSize;
+import org.apache.accumulo.core.file.blockfile.cache.SizeConstants;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a
+ * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations.
+ *
+ * <p>
+ * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a
+ * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the
+ * cache, adding a least-frequently-used element to the eviction algorithm.
+ *
+ * <p>
+ * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if
+ * any priority is not using its entire chunk the others are able to grow beyond their chunk size.
+ *
+ * <p>
+ * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is
+ * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map.
+ *
+ * <p>
+ * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and
+ * control the eviction thread.
+ *
+ * <p>
+ * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size
+ * specified.
+ *
+ * <p>
+ * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and
+ * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
+ * uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
+ */
+public class LruBlockCache implements BlockCache, HeapSize {
+
+  private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class);
+
+  /** Statistics thread */
+  static final int statThreadPeriod = 60;
+
+  /** Concurrent map (the cache) */
+  private final ConcurrentHashMap<String,CachedBlock> map;
+
+  /** Eviction lock (locked when eviction in process) */
+  private final ReentrantLock evictionLock = new ReentrantLock(true);
+
+  /** Volatile boolean to track if we are in an eviction process or not */
+  private volatile boolean evictionInProgress = false;
+
+  /** Eviction thread */
+  private final EvictionThread evictionThread;
+
+  /** Statistics thread schedule pool (for heavy debugging, could remove) */
+  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));
+
+  /** Current size of cache */
+  private AtomicLong size;
+
+  /** Current number of cached elements */
+  private AtomicLong elements;
+
+  /** Cache access count (sequential ID) */
+  private AtomicLong count;
+
+  /** Cache statistics */
+  private CacheStats stats;
+
+  /** Overhead of the structure itself */
+  private final long overhead;
+
+  private final LruBlockCacheConfiguration conf;
+
+  /**
+   * Default constructor. Specify maximum size and expected average block size (approximation is fine).
+   *
+   * <p>
+   * All other factors will be calculated based on defaults specified in this class.
+   *
+   * @param conf
+   *          block cache configuration
+   * @param maxSize
+   *          maximum size of cache, in bytes
+   * @param blockSize
+   *          approximate size of each block, in bytes
+   */
+  public LruBlockCache(final LruBlockCacheConfiguration conf) {
+    this.conf = conf;
+
+    int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize());
+
+    if (conf.getSingleFactor() + conf.getMultiFactor() + conf.getMemoryFactor() != 1) {
+      throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0");
+    }
+    if (conf.getMinFactor() >= conf.getAcceptableFactor()) {
+      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
+    }
+    if (conf.getMinFactor() >= 1.0f || conf.getAcceptableFactor() >= 1.0f) {
+      throw new IllegalArgumentException("all factors must be < 1");
+    }
+    map = new ConcurrentHashMap<>(mapInitialSize, conf.getMapLoadFactor(), conf.getMapConcurrencyLevel());
+    this.stats = new CacheStats();
+    this.count = new AtomicLong(0);
+    this.elements = new AtomicLong(0);
+    this.overhead = calculateOverhead(conf.getMaxSize(), conf.getBlockSize(), conf.getMapConcurrencyLevel());
+    this.size = new AtomicLong(this.overhead);
+
+    if (conf.isUseEvictionThread()) {
+      this.evictionThread = new EvictionThread(this);
+      this.evictionThread.start();
+      while (!this.evictionThread.running()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    } else {
+      this.evictionThread = null;
+    }
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+  }
+
+  public void start() {}
+
+  public void stop() {}
+
+  public long getOverhead() {
+    return overhead;
+  }
+
+  // BlockCache implementation
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
+   * race condition and will update the buffer but not modify the size of the cache.
+   *
+   * @param blockName
+   *          block name
+   * @param buf
+   *          block buffer
+   * @param inMemory
+   *          if block is in-memory
+   */
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
+    CachedBlock cb = map.get(blockName);
+    if (cb != null) {
+      stats.duplicateReads();
+      cb.access(count.incrementAndGet());
+    } else {
+      cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
+      CachedBlock currCb = map.putIfAbsent(blockName, cb);
+      if (currCb != null) {
+        stats.duplicateReads();
+        cb = currCb;
+        cb.access(count.incrementAndGet());
+      } else {
+        // Actually added block to cache
+        long newSize = size.addAndGet(cb.heapSize());
+        elements.incrementAndGet();
+        if (newSize > acceptableSize() && !evictionInProgress) {
+          runEviction();
+        }
+      }
+    }
+
+    return cb;
+  }
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
+   * race condition and will update the buffer but not modify the size of the cache.
+   *
+   * @param blockName
+   *          block name
+   * @param buf
+   *          block buffer
+   */
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte buf[]) {
+    return cacheBlock(blockName, buf, false);
+  }
+
+  /**
+   * Get the buffer of the block with the specified name.
+   *
+   * @param blockName
+   *          block name
+   * @return buffer of specified block name, or null if not in cache
+   */
+  @Override
+  public CachedBlock getBlock(String blockName) {
+    CachedBlock cb = map.get(blockName);
+    if (cb == null) {
+      stats.miss();
+      return null;
+    }
+    stats.hit();
+    cb.access(count.incrementAndGet());
+    return cb;
+  }
+
+  protected long evictBlock(CachedBlock block) {
+    map.remove(block.getName());
+    size.addAndGet(-1 * block.heapSize());
+    elements.decrementAndGet();
+    stats.evicted();
+    return block.heapSize();
+  }
+
+  /**
+   * Multi-threaded call to run the eviction process.
+   */
+  private void runEviction() {
+    if (evictionThread == null) {
+      evict();
+    } else {
+      evictionThread.evict();
+    }
+  }
+
+  /**
+   * Eviction method.
+   */
+  void evict() {
+
+    // Ensure only one eviction at a time
+    if (!evictionLock.tryLock())
+      return;
+
+    try {
+      evictionInProgress = true;
+
+      long bytesToFree = size.get() - minSize();
+
+      log.trace("Block cache LRU eviction started.  Attempting to free {} bytes", bytesToFree);
+
+      if (bytesToFree <= 0)
+        return;
+
+      // Instantiate priority buckets
+      BlockBucket bucketSingle = new BlockBucket(bytesToFree, conf.getBlockSize(), singleSize());
+      BlockBucket bucketMulti = new BlockBucket(bytesToFree, conf.getBlockSize(), multiSize());
+      BlockBucket bucketMemory = new BlockBucket(bytesToFree, conf.getBlockSize(), memorySize());
+
+      // Scan entire map putting into appropriate buckets
+      for (CachedBlock cachedBlock : map.values()) {
+        switch (cachedBlock.getPriority()) {
+          case SINGLE: {
+            bucketSingle.add(cachedBlock);
+            break;
+          }
+          case MULTI: {
+            bucketMulti.add(cachedBlock);
+            break;
+          }
+          case MEMORY: {
+            bucketMemory.add(cachedBlock);
+            break;
+          }
+        }
+      }
+
+      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
+
+      bucketQueue.add(bucketSingle);
+      bucketQueue.add(bucketMulti);
+      bucketQueue.add(bucketMemory);
+
+      int remainingBuckets = 3;
+      long bytesFreed = 0;
+
+      BlockBucket bucket;
+      while ((bucket = bucketQueue.poll()) != null) {
+        long overflow = bucket.overflow();
+        if (overflow > 0) {
+          long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
+          bytesFreed += bucket.free(bucketBytesToFree);
+        }
+        remainingBuckets--;
+      }
+
+      float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024));
+      float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024));
+      float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024));
+
+      log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed,
+          singleMB, bucketSingle.totalSize(), multiMB, bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize());
+
+    } finally {
+      stats.evict();
+      evictionInProgress = false;
+      evictionLock.unlock();
+    }
+  }
+
+  /**
+   * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm
+   * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes.
+   */
+  private class BlockBucket implements Comparable<BlockBucket> {
+
+    private CachedBlockQueue queue;
+    private long totalSize = 0;
+    private long bucketSize;
+
+    public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
+      this.bucketSize = bucketSize;
+      queue = new CachedBlockQueue(bytesToFree, blockSize);
+      totalSize = 0;
+    }
+
+    public void add(CachedBlock block) {
+      totalSize += block.heapSize();
+      queue.add(block);
+    }
+
+    public long free(long toFree) {
+      CachedBlock[] blocks = queue.get();
+      long freedBytes = 0;
+      for (int i = 0; i < blocks.length; i++) {
+        freedBytes += evictBlock(blocks[i]);
+        if (freedBytes >= toFree) {
+          return freedBytes;
+        }
+      }
+      return freedBytes;
+    }
+
+    public long overflow() {
+      return totalSize - bucketSize;
+    }
+
+    public long totalSize() {
+      return totalSize;
+    }
+
+    @Override
+    public int compareTo(BlockBucket that) {
+      if (this.overflow() == that.overflow())
+        return 0;
+      return this.overflow() > that.overflow() ? 1 : -1;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(overflow());
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that instanceof BlockBucket)
+        return compareTo((BlockBucket) that) == 0;
+      return false;
+    }
+  }
+
+  @Override
+  public long getMaxSize() {
+    return this.conf.getMaxSize();
+  }
+
+  /**
+   * Get the current size of this cache.
+   *
+   * @return current size in bytes
+   */
+  public long getCurrentSize() {
+    return this.size.get();
+  }
+
+  /**
+   * Get the current size of this cache.
+   *
+   * @return current size in bytes
+   */
+  public long getFreeSize() {
+    return getMaxSize() - getCurrentSize();
+  }
+
+  /**
+   * Get the size of this cache (number of cached blocks)
+   *
+   * @return number of cached blocks
+   */
+  public long size() {
+    return this.elements.get();
+  }
+
+  /**
+   * Get the number of eviction runs that have occurred
+   */
+  public long getEvictionCount() {
+    return this.stats.getEvictionCount();
+  }
+
+  /**
+   * Get the number of blocks that have been evicted during the lifetime of this cache.
+   */
+  public long getEvictedCount() {
+    return this.stats.getEvictedCount();
+  }
+
+  /**
+   * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level.
+   *
+   * <p>
+   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
+   */
+  private static class EvictionThread extends Thread {
+    private WeakReference<LruBlockCache> cache;
+    private boolean running = false;
+
+    public EvictionThread(LruBlockCache cache) {
+      super("LruBlockCache.EvictionThread");
+      setDaemon(true);
+      this.cache = new WeakReference<>(cache);
+    }
+
+    public synchronized boolean running() {
+      return running;
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        synchronized (this) {
+          running = true;
+          try {
+            this.wait();
+          } catch (InterruptedException e) {}
+        }
+        LruBlockCache cache = this.cache.get();
+        if (cache == null)
+          break;
+        cache.evict();
+      }
+    }
+
+    public void evict() {
+      synchronized (this) {
+        this.notify();
+      }
+    }
+  }
+
+  /*
+   * Statistics thread. Periodically prints the cache statistics to the log.
+   */
+  private static class StatisticsThread extends Thread {
+    LruBlockCache lru;
+
+    public StatisticsThread(LruBlockCache lru) {
+      super("LruBlockCache.StatisticsThread");
+      setDaemon(true);
+      this.lru = lru;
+    }
+
+    @Override
+    public void run() {
+      lru.logStats();
+    }
+  }
+
+  public void logStats() {
+    // Log size
+    long totalSize = heapSize();
+    long freeSize = this.conf.getMaxSize() - totalSize;
+    float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
+    float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
+    float maxMB = ((float) this.conf.getMaxSize()) / ((float) (1024 * 1024));
+    log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={},"
+        + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, this.conf.getMaxSize(),
+        size(), stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100,
+        stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads());
+  }
+
+  /**
+   * Get counter statistics for this cache.
+   *
+   * <p>
+   * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
+   */
+  public CacheStats getStats() {
+    return this.stats;
+  }
+
+  public static class CacheStats implements BlockCache.Stats {
+    private final AtomicLong accessCount = new AtomicLong(0);
+    private final AtomicLong hitCount = new AtomicLong(0);
+    private final AtomicLong missCount = new AtomicLong(0);
+    private final AtomicLong evictionCount = new AtomicLong(0);
+    private final AtomicLong evictedCount = new AtomicLong(0);
+    private final AtomicLong duplicateReads = new AtomicLong(0);
+
+    public void miss() {
+      missCount.incrementAndGet();
+      accessCount.incrementAndGet();
+    }
+
+    public void hit() {
+      hitCount.incrementAndGet();
+      accessCount.incrementAndGet();
+    }
+
+    public void evict() {
+      evictionCount.incrementAndGet();
+    }
+
+    public void duplicateReads() {
+      duplicateReads.incrementAndGet();
+    }
+
+    public void evicted() {
+      evictedCount.incrementAndGet();
+    }
+
+    @Override
+    public long requestCount() {
+      return accessCount.get();
+    }
+
+    public long getMissCount() {
+      return missCount.get();
+    }
+
+    @Override
+    public long hitCount() {
+      return hitCount.get();
+    }
+
+    public long getEvictionCount() {
+      return evictionCount.get();
+    }
+
+    public long getDuplicateReads() {
+      return duplicateReads.get();
+    }
+
+    public long getEvictedCount() {
+      return evictedCount.get();
+    }
+
+    public double getHitRatio() {
+      return ((float) hitCount() / (float) requestCount());
+    }
+
+    public double getMissRatio() {
+      return ((float) getMissCount() / (float) requestCount());
+    }
+
+    public double evictedPerEviction() {
+      return (float) getEvictedCount() / (float) getEvictionCount();
+    }
+  }
+
+  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
+      + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT);
+
+  // HeapSize implementation
+  @Override
+  public long heapSize() {
+    return getCurrentSize();
+  }
+
+  public static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
+    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+        + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+  }
+
+  // Simple calculators of sizes given factors and maxSize
+
+  private long acceptableSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * this.conf.getAcceptableFactor());
+  }
+
+  private long minSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMinFactor());
+  }
+
+  private long singleSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * this.conf.getSingleFactor() * this.conf.getMinFactor());
+  }
+
+  private long multiSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMultiFactor() * this.conf.getMinFactor());
+  }
+
+  private long memorySize() {
+    return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMemoryFactor() * this.conf.getMinFactor());
+  }
+
+  public void shutdown() {
+    this.scheduleThreadPool.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
new file mode 100644
index 0000000..b7fb472
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
+
+public final class LruBlockCacheConfiguration extends BlockCacheConfiguration {
+
+  /** Default Configuration Parameters */
+
+  /** Backing Concurrent Map Configuration */
+  public static final Float DEFAULT_LOAD_FACTOR = 0.75f;
+  public static final Integer DEFAULT_CONCURRENCY_LEVEL = 16;
+
+  /** Eviction thresholds */
+  public static final Float DEFAULT_MIN_FACTOR = 0.75f;
+  public static final Float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
+
+  /** Priority buckets */
+  public static final Float DEFAULT_SINGLE_FACTOR = 0.25f;
+  public static final Float DEFAULT_MULTI_FACTOR = 0.50f;
+  public static final Float DEFAULT_MEMORY_FACTOR = 0.25f;
+
+  // property names
+  private static final String PREFIX = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block.lru.";
+  public static final String ACCEPTABLE_FACTOR_PROPERTY = PREFIX + "acceptable.factor";
+  public static final String MIN_FACTOR_PROPERTY = PREFIX + "min.factor";
+  public static final String SINGLE_FACTOR_PROPERTY = PREFIX + "single.factor";
+  public static final String MULTI_FACTOR_PROPERTY = PREFIX + "multi.factor";
+  public static final String MEMORY_FACTOR_PROPERTY = PREFIX + "memory.factor";
+  public static final String MAP_LOAD_PROPERTY = PREFIX + "map.load";
+  public static final String MAP_CONCURRENCY_PROPERTY = PREFIX + "map.concurrency";
+  public static final String EVICTION_THREAD_PROPERTY = PREFIX + "eviction.thread";
+
+  /** Acceptable size of cache (no evictions if size < acceptable) */
+  private final float acceptableFactor;
+
+  /** Minimum threshold of cache (when evicting, evict until size < min) */
+  private final float minFactor;
+
+  /** Single access bucket size */
+  private final float singleFactor;
+
+  /** Multiple access bucket size */
+  private final float multiFactor;
+
+  /** In-memory bucket size */
+  private final float memoryFactor;
+
+  /** LruBlockCache cache = new LruBlockCache **/
+  private final float mapLoadFactor;
+
+  /** LruBlockCache cache = new LruBlockCache **/
+  private final int mapConcurrencyLevel;
+
+  private final boolean useEvictionThread;
+
+  public LruBlockCacheConfiguration(AccumuloConfiguration conf) {
+    super(conf);
+    Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX);
+    this.acceptableFactor = getOrDefault(props, ACCEPTABLE_FACTOR_PROPERTY, DEFAULT_ACCEPTABLE_FACTOR);
+    this.minFactor = getOrDefault(props, MIN_FACTOR_PROPERTY, DEFAULT_MIN_FACTOR);
+    this.singleFactor = getOrDefault(props, SINGLE_FACTOR_PROPERTY, DEFAULT_SINGLE_FACTOR);
+    this.multiFactor = getOrDefault(props, MULTI_FACTOR_PROPERTY, DEFAULT_MULTI_FACTOR);
+    this.memoryFactor = getOrDefault(props, MEMORY_FACTOR_PROPERTY, DEFAULT_MEMORY_FACTOR);
+    this.mapLoadFactor = getOrDefault(props, MAP_LOAD_PROPERTY, DEFAULT_LOAD_FACTOR);
+    this.mapConcurrencyLevel = getOrDefault(props, MAP_CONCURRENCY_PROPERTY, DEFAULT_CONCURRENCY_LEVEL);
+    this.useEvictionThread = getOrDefault(props, EVICTION_THREAD_PROPERTY, Boolean.TRUE);
+  }
+
+  public float getAcceptableFactor() {
+    return acceptableFactor;
+  }
+
+  public float getMinFactor() {
+    return minFactor;
+  }
+
+  public float getSingleFactor() {
+    return singleFactor;
+  }
+
+  public float getMultiFactor() {
+    return multiFactor;
+  }
+
+  public float getMemoryFactor() {
+    return memoryFactor;
+  }
+
+  public float getMapLoadFactor() {
+    return mapLoadFactor;
+  }
+
+  public int getMapConcurrencyLevel() {
+    return mapConcurrencyLevel;
+  }
+
+  public boolean isUseEvictionThread() {
+    return useEvictionThread;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java
new file mode 100644
index 0000000..b923b8e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory;
+
+public class LruBlockCacheFactory extends BlockCacheFactory {
+
+  @Override
+  public BlockCache getBlockCache(AccumuloConfiguration conf) {
+    return new LruBlockCache(new LruBlockCacheConfiguration(conf));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
new file mode 100644
index 0000000..fccd55c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.ClassSize;
+import org.apache.accumulo.core.file.blockfile.cache.SizeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Policy;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1)
+ * read and write operations.
+ * <ul>
+ * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
+ * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
+ * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
+ * </ul>
+ */
+public final class TinyLfuBlockCache implements BlockCache {
+  private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class);
+  private static final int STATS_PERIOD_SEC = 60;
+
+  private Cache<String,Block> cache;
+  private Policy.Eviction<String,Block> policy;
+  private ScheduledExecutorService statsExecutor;
+
+  public TinyLfuBlockCache(TinyLfuBlockCacheConfiguration conf) {
+    cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize())).weigher((String blockName, Block block) -> {
+      int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
+      return keyWeight + block.weight();
+    }).maximumWeight(conf.getMaxSize()).recordStats().build();
+    policy = cache.policy().eviction().get();
+    statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
+        .build());
+    statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS);
+
+  }
+
+  public void start() {}
+
+  public void stop() {}
+
+  @Override
+  public long getMaxSize() {
+    return policy.getMaximum();
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName) {
+    return cache.getIfPresent(blockName);
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buffer) {
+    return cache.asMap().compute(blockName, (key, block) -> {
+      if (block == null) {
+        return new Block(buffer);
+      }
+      block.buffer = buffer;
+      return block;
+    });
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) {
+    return cacheBlock(blockName, buffer);
+  }
+
+  @Override
+  public BlockCache.Stats getStats() {
+    CacheStats stats = cache.stats();
+    return new BlockCache.Stats() {
+      @Override
+      public long hitCount() {
+        return stats.hitCount();
+      }
+
+      @Override
+      public long requestCount() {
+        return stats.requestCount();
+      }
+    };
+  }
+
+  private void logStats() {
+    double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024));
+    double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024));
+    double freeMB = maxMB - sizeMB;
+    log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize());
+    log.debug(cache.stats().toString());
+  }
+
+  private static final class Block implements CacheEntry {
+    private volatile byte[] buffer;
+    private volatile Object index;
+
+    Block(byte[] buffer) {
+      this.buffer = requireNonNull(buffer);
+    }
+
+    @Override
+    public byte[] getBuffer() {
+      return buffer;
+    }
+
+    @Override
+    public Object getIndex() {
+      return index;
+    }
+
+    @Override
+    public void setIndex(Object index) {
+      this.index = index;
+    }
+
+    int weight() {
+      return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
new file mode 100644
index 0000000..3d1efa5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
+
+public final class TinyLfuBlockCacheConfiguration extends BlockCacheConfiguration {
+
+  public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf) {
+    super(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java
new file mode 100644
index 0000000..33db576
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory;
+
+public class TinyLfuBlockCacheFactory extends BlockCacheFactory {
+
+  @Override
+  public BlockCache getBlockCache(AccumuloConfiguration conf) {
+    return new TinyLfuBlockCache(new TinyLfuBlockCacheConfiguration(conf));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 73cabf2..ab98816 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -91,7 +91,7 @@ public class SummaryReader {
     }
 
     @Override
-    public void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception {}
+    public void start() {}
 
     @Override
     public void stop() {}


Mime
View raw message