accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-4463: Make block cache implementation pluggable
Date Mon, 22 May 2017 17:10:59 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 3f1b0f33c -> 452732c8a


http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
new file mode 100644
index 0000000..a67f164
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+
+public final class TinyLfuBlockCacheConfiguration extends BlockCacheConfiguration {
+
+  public static final String PROPERTY_PREFIX = "tinylfu";
+
+  public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) {
+    super(conf, type, PROPERTY_PREFIX);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java
new file mode 100644
index 0000000..a68c4e6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TinyLfuBlockCacheManager extends BlockCacheManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCacheManager.class);
+
+  @Override
+  protected TinyLfuBlockCache createCache(AccumuloConfiguration conf, CacheType type) {
+    TinyLfuBlockCacheConfiguration cc = new TinyLfuBlockCacheConfiguration(conf, type);
+    LOG.info("Creating {} cache with configuration {}", type, cc);
+    return new TinyLfuBlockCache(cc);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 9b2b5d9..daa8f22 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -89,6 +89,11 @@ public class SummaryReader {
     public Stats getStats() {
       return summaryCache.getStats();
     }
+
+    @Override
+    public long getMaxHeapSize() {
+      return summaryCache.getMaxHeapSize();
+    }
   }
 
   private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration>
summarySelector) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java
new file mode 100644
index 0000000..e17fb76
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BlockCacheFactoryTest {
+
+  @Test
+  public void testCreateLruBlockCacheFactory() throws Exception {
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager.getInstance(cc);
+  }
+
+  @Test
+  public void testCreateTinyLfuBlockCacheFactory() throws Exception {
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName());
+    BlockCacheManager.getInstance(cc);
+  }
+
+  @Test
+  public void testStartWithDefault() throws Exception {
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    BlockCacheManager manager = BlockCacheManager.getInstance(dc);
+    manager.start(dc);
+    Assert.assertNotNull(manager.getBlockCache(CacheType.INDEX));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java
new file mode 100644
index 0000000..72ea49c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BlockConfigurationHelperTest {
+
+  @Test
+  public void testGetPropertyPrefix() throws Exception {
+    Assert.assertEquals("general.custom.cache.block.lru.data.", BlockCacheConfiguration.getPrefix(CacheType.DATA,
"lru"));
+    Assert.assertEquals("general.custom.cache.block.lru.index.", BlockCacheConfiguration.getPrefix(CacheType.INDEX,
"lru"));
+    Assert.assertEquals("general.custom.cache.block.lru.summary.", BlockCacheConfiguration.getPrefix(CacheType.SUMMARY,
"lru"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
index a5ab14a..966b6eb 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
@@ -22,6 +22,13 @@ import java.util.Random;
 
 import junit.framework.TestCase;
 
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
+
 /**
  * Tests the concurrent LruBlockCache.
  * <p>
@@ -31,12 +38,50 @@ import junit.framework.TestCase;
  */
 public class TestLruBlockCache extends TestCase {
 
+  public void testConfiguration() {
+    ConfigurationCopy cc = new ConfigurationCopy();
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(1019));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(1000023));
+
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.93f).acceptableFactor(0.97f).singleFactor(0.20f)
+        .multiFactor(0.30f).memoryFactor(0.50f).mapConcurrencyLevel(5).buildMap().forEach(cc::set);
+
+    String defaultPrefix = BlockCacheConfiguration.getDefaultPrefix(LruBlockCacheConfiguration.PROPERTY_PREFIX);
+
+    // this should be overridden by cache type specific setting
+    cc.set(defaultPrefix + LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, "0.6");
+
+    // this is not set for the cache type, so should fall back to default
+    cc.set(defaultPrefix + LruBlockCacheConfiguration.MAP_LOAD_PROPERTY, "0.53");
+
+    LruBlockCacheConfiguration lbcc = new LruBlockCacheConfiguration(cc, CacheType.INDEX);
+
+    assertEquals(false, lbcc.isUseEvictionThread());
+    assertEquals(0.93f, lbcc.getMinFactor());
+    assertEquals(0.97f, lbcc.getAcceptableFactor());
+    assertEquals(0.20f, lbcc.getSingleFactor());
+    assertEquals(0.30f, lbcc.getMultiFactor());
+    assertEquals(0.50f, lbcc.getMemoryFactor());
+    assertEquals(0.53f, lbcc.getMapLoadFactor());
+    assertEquals(5, lbcc.getMapConcurrencyLevel());
+    assertEquals(1019, lbcc.getBlockSize());
+    assertEquals(1000023, lbcc.getMaxSize());
+  }
+
   public void testBackgroundEvictionThread() throws Exception {
 
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 9); // room for 9, will evict
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
+    manager.start(cc);
+    LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
     Block[] blocks = generateFixedBlocks(10, blockSize, "block");
 
@@ -53,6 +98,8 @@ public class TestLruBlockCache extends TestCase {
     }
     // A single eviction run should have occurred
     assertEquals(cache.getEvictionCount(), 1);
+
+    manager.stop();
   }
 
   public void testCacheSimple() throws Exception {
@@ -60,7 +107,14 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 1000000;
     long blockSize = calculateBlockSizeDefault(maxSize, 101);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
+    manager.start(cc);
+    LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
     Block[] blocks = generateRandomBlocks(100, blockSize);
 
@@ -102,6 +156,7 @@ public class TestLruBlockCache extends TestCase {
     // Thread t = new LruBlockCache.StatisticsThread(cache);
     // t.start();
     // t.join();
+    manager.stop();
   }
 
   public void testCacheEvictionSimple() throws Exception {
@@ -109,7 +164,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false);
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).buildMap().forEach(cc::set);
+    manager.start(cc);
+
+    LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
     Block[] blocks = generateFixedBlocks(10, blockSize, "block");
 
@@ -125,13 +189,13 @@ public class TestLruBlockCache extends TestCase {
     assertEquals(1, cache.getEvictionCount());
 
     // Our expected size overruns acceptable limit
-    assertTrue(expectedCacheSize > (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+    assertTrue(expectedCacheSize > (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR));
 
     // But the cache did not grow beyond max
     assertTrue(cache.heapSize() < maxSize);
 
     // And is still below the acceptable limit
-    assertTrue(cache.heapSize() < (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+    assertTrue(cache.heapSize() < (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR));
 
     // All blocks except block 0 and 1 should be in the cache
     assertTrue(cache.getBlock(blocks[0].blockName) == null);
@@ -139,6 +203,7 @@ public class TestLruBlockCache extends TestCase {
     for (int i = 2; i < blocks.length; i++) {
       assertTrue(Arrays.equals(cache.getBlock(blocks[i].blockName).getBuffer(), blocks[i].buf));
     }
+    manager.stop();
   }
 
   public void testCacheEvictionTwoPriorities() throws Exception {
@@ -146,12 +211,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2
* maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
-        0.99f, // acceptable
-        0.25f, // single
-        0.50f, // multi
-        0.25f);// memory
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.25f)
+        .multiFactor(0.50f).memoryFactor(0.25f).buildMap().forEach(cc::set);
+    manager.start(cc);
+    LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
     Block[] singleBlocks = generateFixedBlocks(5, 10000, "single");
     Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi");
@@ -178,13 +247,13 @@ public class TestLruBlockCache extends TestCase {
     assertEquals(cache.getEvictedCount(), 2);
 
     // Our expected size overruns acceptable limit
-    assertTrue(expectedCacheSize > (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+    assertTrue(expectedCacheSize > (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR));
 
     // But the cache did not grow beyond max
     assertTrue(cache.heapSize() <= maxSize);
 
     // And is now below the acceptable limit
-    assertTrue(cache.heapSize() <= (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+    assertTrue(cache.heapSize() <= (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR));
 
     // We expect fairness across the two priorities.
     // This test makes multi go barely over its limit, in-memory
@@ -198,6 +267,7 @@ public class TestLruBlockCache extends TestCase {
       assertTrue(Arrays.equals(cache.getBlock(singleBlocks[i].blockName).getBuffer(), singleBlocks[i].buf));
       assertTrue(Arrays.equals(cache.getBlock(multiBlocks[i].blockName).getBuffer(), multiBlocks[i].buf));
     }
+    manager.stop();
   }
 
   public void testCacheEvictionThreePriorities() throws Exception {
@@ -205,12 +275,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSize(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2
* maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
-        0.99f, // acceptable
-        0.33f, // single
-        0.33f, // multi
-        0.34f);// memory
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.33f)
+        .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set);
+    manager.start(cc);
+    LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
     Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
@@ -315,6 +389,7 @@ public class TestLruBlockCache extends TestCase {
     assertEquals(null, cache.getBlock(memoryBlocks[2].blockName));
     assertEquals(null, cache.getBlock(memoryBlocks[3].blockName));
 
+    manager.stop();
   }
 
   // test scan resistance
@@ -323,12 +398,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSize(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2
* maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.66f, // min
-        0.99f, // acceptable
-        0.33f, // single
-        0.33f, // multi
-        0.34f);// memory
+    DefaultConfiguration dc = DefaultConfiguration.getInstance();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.66f).acceptableFactor(0.99f).singleFactor(0.33f)
+        .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set);
+    manager.start(cc);
+    LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
     Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
@@ -374,64 +453,7 @@ public class TestLruBlockCache extends TestCase {
     // Should now have 7 total blocks
     assertEquals(7, cache.size());
 
-  }
-
-  // test setMaxSize
-  public void testResizeBlockCache() throws Exception {
-
-    long maxSize = 300000;
-    long blockSize = calculateBlockSize(maxSize, 31);
-
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2
* maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
-        0.99f, // acceptable
-        0.33f, // single
-        0.33f, // multi
-        0.34f);// memory
-
-    Block[] singleBlocks = generateFixedBlocks(10, blockSize, "single");
-    Block[] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
-    Block[] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
-
-    // Add all blocks from all priorities
-    for (int i = 0; i < 10; i++) {
-
-      // Just add single blocks
-      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
-
-      // Add and get multi blocks
-      cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
-      cache.getBlock(multiBlocks[i].blockName);
-
-      // Add memory blocks as such
-      cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
-    }
-
-    // Do not expect any evictions yet
-    assertEquals(0, cache.getEvictionCount());
-
-    // Resize to half capacity plus an extra block (otherwise we evict an extra)
-    cache.setMaxSize((long) (maxSize * 0.5f));
-
-    // Should have run a single eviction
-    assertEquals(1, cache.getEvictionCount());
-
-    // And we expect 1/2 of the blocks to be evicted
-    assertEquals(15, cache.getEvictedCount());
-
-    // And the oldest 5 blocks from each category should be gone
-    for (int i = 0; i < 5; i++) {
-      assertEquals(null, cache.getBlock(singleBlocks[i].blockName));
-      assertEquals(null, cache.getBlock(multiBlocks[i].blockName));
-      assertEquals(null, cache.getBlock(memoryBlocks[i].blockName));
-    }
-
-    // And the newest 5 blocks should still be accessible
-    for (int i = 5; i < 10; i++) {
-      assertTrue(Arrays.equals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName).getBuffer()));
-      assertTrue(Arrays.equals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName).getBuffer()));
-      assertTrue(Arrays.equals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName).getBuffer()));
-    }
+    manager.stop();
   }
 
   private Block[] generateFixedBlocks(int numBlocks, int size, String pfx) {
@@ -459,7 +481,7 @@ public class TestLruBlockCache extends TestCase {
     long roughBlockSize = maxSize / numBlocks;
     int numEntries = (int) Math.ceil((1.2) * maxSize / roughBlockSize);
     long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
+ (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY)
-        + (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+        + (LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
     long negateBlockSize = totalOverhead / numEntries;
     negateBlockSize += CachedBlock.PER_BLOCK_OVERHEAD;
     return ClassSize.align((long) Math.floor((roughBlockSize - negateBlockSize) * 0.99f));
@@ -469,10 +491,10 @@ public class TestLruBlockCache extends TestCase {
     long roughBlockSize = maxSize / numBlocks;
     int numEntries = (int) Math.ceil((1.2) * maxSize / roughBlockSize);
     long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
+ (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY)
-        + (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+        + (LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
     long negateBlockSize = totalOverhead / numEntries;
     negateBlockSize += CachedBlock.PER_BLOCK_OVERHEAD;
-    return ClassSize.align((long) Math.floor((roughBlockSize - negateBlockSize) * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+    return ClassSize.align((long) Math.floor((roughBlockSize - negateBlockSize) * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR));
   }
 
   private static class Block implements HeapSize {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 11ded64..ef3fb62 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -58,7 +58,10 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.streams.PositionedOutputs;
@@ -206,6 +209,7 @@ public class RFileTest {
     protected AccumuloConfiguration accumuloConfiguration;
     public Reader reader;
     public SortedKeyValueIterator<Key,Value> iter;
+    private BlockCacheManager manager;
 
     public TestRFile(AccumuloConfiguration accumuloConfiguration) {
       this.accumuloConfiguration = accumuloConfiguration;
@@ -265,8 +269,20 @@ public class RFileTest {
       in = new FSDataInputStream(bais);
       fileLength = data.length;
 
-      LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
-      LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
+      DefaultConfiguration dc = DefaultConfiguration.getInstance();
+      ConfigurationCopy cc = new ConfigurationCopy(dc);
+      cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
+      try {
+        manager = BlockCacheManager.getInstance(cc);
+      } catch (Exception e) {
+        throw new RuntimeException("Error creating BlockCacheManager", e);
+      }
+      cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000));
+      cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000));
+      cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000));
+      manager.start(cc);
+      LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
+      LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
 
       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf,
dataCache, indexCache, DefaultConfiguration.getInstance());
       reader = new RFile.Reader(_cbr);
@@ -279,6 +295,9 @@ public class RFileTest {
     public void closeReader() throws IOException {
       reader.close();
       in.close();
+      if (null != manager) {
+        manager.stop();
+      }
     }
 
     public void seek(Key nk) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index d10ff7b..91b748c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -40,8 +40,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.TinyLfuBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.NamingThreadFactory;
@@ -99,6 +99,7 @@ public class TabletServerResourceManager {
 
   private final MemoryManagementFramework memMgmt;
 
+  private final BlockCacheManager cacheManager;
   private final BlockCache _dCache;
   private final BlockCache _iCache;
   private final BlockCache _sCache;
@@ -169,25 +170,24 @@ public class TabletServerResourceManager {
     long maxMemory = acuConf.getAsBytes(Property.TSERV_MAXMEM);
     boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) &&
NativeMap.isLoaded();
 
-    long blockSize = acuConf.getAsBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
-    long dCacheSize = acuConf.getAsBytes(Property.TSERV_DATACACHE_SIZE);
-    long iCacheSize = acuConf.getAsBytes(Property.TSERV_INDEXCACHE_SIZE);
-    long sCacheSize = acuConf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE);
     long totalQueueSize = acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
 
-    String policy = acuConf.get(Property.TSERV_CACHE_POLICY);
-    if (policy.equalsIgnoreCase("LRU")) {
-      _iCache = new LruBlockCache(iCacheSize, blockSize);
-      _dCache = new LruBlockCache(dCacheSize, blockSize);
-      _sCache = new LruBlockCache(sCacheSize, blockSize);
-    } else if (policy.equalsIgnoreCase("TinyLFU")) {
-      _iCache = new TinyLfuBlockCache(iCacheSize, blockSize);
-      _dCache = new TinyLfuBlockCache(dCacheSize, blockSize);
-      _sCache = new TinyLfuBlockCache(sCacheSize, blockSize);
-    } else {
-      throw new IllegalArgumentException("Unknown Block cache policy " + policy);
+    try {
+      cacheManager = BlockCacheManager.getInstance(acuConf);
+    } catch (Exception e) {
+      throw new RuntimeException("Error creating BlockCacheManager", e);
     }
 
+    cacheManager.start(acuConf);
+
+    _iCache = cacheManager.getBlockCache(CacheType.INDEX);
+    _dCache = cacheManager.getBlockCache(CacheType.DATA);
+    _sCache = cacheManager.getBlockCache(CacheType.SUMMARY);
+
+    long dCacheSize = _dCache.getMaxHeapSize();
+    long iCacheSize = _iCache.getMaxHeapSize();
+    long sCacheSize = _sCache.getMaxHeapSize();
+
     Runtime runtime = Runtime.getRuntime();
     if (usingNativeMap) {
       // Still check block cache sizes when using native maps.
@@ -543,6 +543,14 @@ public class TabletServerResourceManager {
       executorService.shutdown();
     }
 
+    if (null != this.cacheManager) {
+      try {
+        this.cacheManager.stop();
+      } catch (Exception ex) {
+        log.error("Error stopping BlockCacheManager", ex);
+      }
+    }
+
     for (Entry<String,ExecutorService> entry : threadPools.entrySet()) {
       while (true) {
         try {


Mime
View raw message