accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject [2/4] accumulo git commit: New test, changes associated with it
Date Fri, 19 May 2017 17:17:26 GMT
New test, changes associated with it


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d2017f96
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d2017f96
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d2017f96

Branch: refs/heads/IGNITE
Commit: d2017f96775a6fb845051f010c4be34c9501e932
Parents: c4a18a6
Author: Dave Marion <dlmarion@apache.org>
Authored: Fri May 19 13:08:28 2017 -0400
Committer: Dave Marion <dlmarion@apache.org>
Committed: Fri May 19 13:08:28 2017 -0400

----------------------------------------------------------------------
 .../cache/tiered/TieredBlockCache.java          | 31 +++++++++-
 .../tiered/TieredBlockCacheConfiguration.java   | 10 +--
 .../cache/tiered/TieredBlockCacheManager.java   | 39 +++++++++++-
 .../blockfile/cache/TestTieredBlockCache.java   | 64 +++++++++++++++++---
 4 files changed, 128 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
index 6031666..f8ef434 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -2,12 +2,16 @@ package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CachePeekMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,22 +43,41 @@ public class TieredBlockCache implements BlockCache {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class);
 	private final IgniteCache<String, Block> cache;
+	private final CacheMetrics metrics;
 	private final TieredBlockCacheConfiguration conf;
 	private final AtomicLong hitCount = new AtomicLong(0);
 	private final AtomicLong requestCount = new AtomicLong(0);
-
+	private final ScheduledFuture<?> future;
 	
 	public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) {
 		this.conf = conf;
 		this.cache = ignite.getOrCreateCache(conf.getConfiguration());
+		metrics = cache.localMxBean();
 		LOG.info("Created {} cache with configuration {}", 
 				conf.getConfiguration().getName(), conf.getConfiguration());
+		this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() {
+			@Override
+			public void run() {
+				LOG.info(cache.localMetrics().toString());
+				LOG.info(cache.getName() + " entries, on-heap: " + getOnHeapEntryCount() + ", off-heap:
" + getOffHeapEntryCount());
+			}
+		}, TieredBlockCacheManager.STAT_INTERVAL, TieredBlockCacheManager.STAT_INTERVAL, TimeUnit.SECONDS);
 	}
 	
 	public void stop() {
+		this.future.cancel(false);
 		this.cache.close();
+		this.cache.destroy();
 	}
 	
+	public long getOnHeapEntryCount() {
+		return this.cache.sizeLong(CachePeekMode.ONHEAP);
+	}
+
+	public long getOffHeapEntryCount() {
+		return this.cache.sizeLong(CachePeekMode.OFFHEAP);
+	}
+
 	@Override
 	public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
 		return cacheBlock(blockName, buf);
@@ -62,7 +85,7 @@ public class TieredBlockCache implements BlockCache {
 	
 	@Override
 	public CacheEntry cacheBlock(String blockName, byte[] buf) {
-		return this.cache.getAndPutIfAbsent(blockName, new Block(buf));
+		return this.cache.getAndPut(blockName, new Block(buf));
 	}
 
 	@Override
@@ -79,6 +102,10 @@ public class TieredBlockCache implements BlockCache {
 	public long getMaxSize() {
 		return this.conf.getMaxSize();
 	}
+	
+	public CacheMetrics getCacheMetrics() {
+		return this.metrics;
+	}
 
 	@Override
 	public Stats getStats() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
index 5f2cde5..6b813e4 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -22,8 +22,12 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration
{
 	  configuration.setName(type.name());
 	  configuration.setCacheMode(CacheMode.LOCAL);
 	  configuration.setOnheapCacheEnabled(true);
-	  configuration.setEvictionPolicy(new LruEvictionPolicy<String, Block>((int) this.getMaxSize()));
+	  LruEvictionPolicy<String, Block> ePolicy = new LruEvictionPolicy<>();
+	  ePolicy.setMaxSize((int) (0.75 * this.getMaxSize()));
+	  ePolicy.setMaxMemorySize(this.getMaxSize());
+	  configuration.setEvictionPolicy(ePolicy);
 	  configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(Duration.ONE_HOUR));
+	  configuration.setStatisticsEnabled(true);
 	}
 
 	public CacheConfiguration<String, Block> getConfiguration() {
@@ -34,7 +38,5 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration
{
 	public String toString() {
 	  return this.configuration.toString();
 	}
-	
-	
-	
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
index 5ea8a80..d3f1170 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -1,11 +1,16 @@
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.DataPageEvictionMode;
@@ -19,23 +24,32 @@ public class TieredBlockCacheManager extends BlockCacheManager {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCacheManager.class);
 	
-	public static final String PROPERTY_PREFIX = "tiered";
+	static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new
NamingThreadFactory("TieredBlockCacheStats"));
+	static final int STAT_INTERVAL = 60;
 	
+	public static final String PROPERTY_PREFIX = "tiered";
 	private static final String TIERED_PROPERTY_BASE = BlockCacheManager.CACHE_PROPERTY_BASE
+ PROPERTY_PREFIX + ".";
+	
 	public static final String OFF_HEAP_MAX_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.max.size";
 	public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.block.size";
 	
+	private static final long OFF_HEAP_MIN_SIZE = 10 * 1024 * 1024;
 	private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024;
 	private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024;
+	private static final String OFF_HEAP_CONFIG_NAME = "OFF_HEAP_MEMORY";
 
 	private Ignite IGNITE;
 	
 	@Override
 	public void start(AccumuloConfiguration conf) {
 		
-		final long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
+		long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
 		final int offHeapBlockSize = Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT);
 
+		if (offHeapMaxSize < OFF_HEAP_MIN_SIZE) {
+			LOG.warn("Off heap max size setting too low, overriding to minimum of 10MB");
+			offHeapMaxSize = OFF_HEAP_MIN_SIZE;
+		}
 		// Ignite configuration.
 		IgniteConfiguration cfg = new IgniteConfiguration();
 		cfg.setDaemon(true);
@@ -45,18 +59,38 @@ public class TieredBlockCacheManager extends BlockCacheManager {
 		memCfg.setPageSize(offHeapBlockSize);
 		
 		MemoryPolicyConfiguration plCfg = new MemoryPolicyConfiguration();
+		plCfg.setName(OFF_HEAP_CONFIG_NAME);
 		plCfg.setInitialSize(offHeapMaxSize);
 		plCfg.setMaxSize(offHeapMaxSize);
 		plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
 		plCfg.setEvictionThreshold(0.9);
+		plCfg.setEmptyPagesPoolSize((int)(offHeapMaxSize / offHeapBlockSize / 10) - 1);
+		plCfg.setMetricsEnabled(true);
 
 		memCfg.setMemoryPolicies(plCfg); //apply custom memory policy
+		memCfg.setDefaultMemoryPolicyName(OFF_HEAP_CONFIG_NAME);
 		
 		cfg.setMemoryConfiguration(memCfg); // apply off-heap memory configuration
 		LOG.info("Starting Ignite with configuration {}", cfg.toString());
 		IGNITE = Ignition.start(cfg);
 
 		super.start(conf);
+		
+		SCHEDULER.scheduleAtFixedRate(new Runnable() {
+			@Override
+			public void run() {
+				IGNITE.memoryMetrics().forEach(m -> {
+					ToStringBuilder builder = new ToStringBuilder(m);
+					builder.append("memory region name", m.getName());
+					builder.append(" page allocation rate", m.getAllocationRate());
+					builder.append(" page eviction rate", m.getEvictionRate());
+					builder.append(" total allocated pages", m.getTotalAllocatedPages());
+					builder.append(" page free space %", m.getPagesFillFactor());
+					builder.append(" large entry fragmentation %", m.getLargeEntriesPagesPercentage());
+					LOG.info(builder.toString());
+				});
+			}
+		}, STAT_INTERVAL, STAT_INTERVAL, TimeUnit.SECONDS);
 	}
 
 	@Override
@@ -67,6 +101,7 @@ public class TieredBlockCacheManager extends BlockCacheManager {
 			cache.stop();
 		  }
 		}
+		SCHEDULER.shutdownNow();
 		IGNITE.close();
 		super.stop();
 	}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
index 3149438..4642224 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
@@ -1,8 +1,5 @@
 package org.apache.accumulo.core.file.blockfile.cache;
 
-import java.nio.charset.StandardCharsets;
-import java.util.Random;
-
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -13,7 +10,7 @@ import org.junit.Test;
 
 public class TestTieredBlockCache {
 	
-	private static final long BLOCKSIZE = 1024;
+	private static final int BLOCKSIZE = 1024;
 	private static final long MAXSIZE = 1024*100;
 	
 	private static class Holder {
@@ -59,15 +56,66 @@ public class TestTieredBlockCache {
 	      Assert.assertTrue(ce != null);
 	      Assert.assertEquals(ce.getBuffer().length, h.getBuf().length);
 	    }
-
 	    manager.stop();
 	}
 	
-    private Holder[] generateRandomBlocks(int numBlocks, long maxSize) {
+	@Test
+	public void testOffHeapBlockMigration() throws Exception {
+	    DefaultConfiguration dc = new DefaultConfiguration();
+	    ConfigurationCopy cc = new ConfigurationCopy(dc);
+	    cc.set(Property.TSERV_CACHE_FACTORY_IMPL, TieredBlockCacheManager.class.getName());
+	    cc.set("general.custom.cache.block.tiered.off-heap.max.size", Long.toString(10*1024*1024));
+	    cc.set("general.custom.cache.block.tiered.off-heap.block.sizee", Long.toString(BLOCKSIZE));
+	    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+	    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
+	    cc.set(Property.TSERV_DATACACHE_SIZE, "2048");
+	    cc.set(Property.TSERV_INDEXCACHE_SIZE, "2048");
+	    cc.set(Property.TSERV_SUMMARYCACHE_SIZE, "2048");
+	    manager.start(cc);
+	    TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA);
+	    
+	    // With this configuration we have an on-heap cache with a max size of 2K with 1K blocks
+	    // and an off heap cache with a max size of 10MB with 1K blocks
+
+	    for (Holder h : generateRandomBlocks(1, 1024)) {
+	    	cache.cacheBlock(h.getName(), h.getBuf());
+	    }
+	    Assert.assertEquals(1, cache.getCacheMetrics().getCachePuts());
+	    Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
+	    
+	    for (Holder h : generateRandomBlocks(1023, 1024)) {
+	    	cache.cacheBlock(h.getName(), h.getBuf());
+	    }
+	    Assert.assertEquals(1, cache.getOnHeapEntryCount());
+	    Assert.assertEquals(1023, cache.getOffHeapEntryCount());
+	    
+	    Assert.assertEquals(1, cache.getCacheMetrics().getSize());
+	    Assert.assertEquals(1023, cache.getCacheMetrics().getOffHeapEntriesCount());
+	    Assert.assertEquals(1024, cache.getCacheMetrics().getCachePuts());
+	    Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
+	    
+	    manager.stop();
+
+	}
+
+	/**
+	 * 
+	 * @param numBlocks
+	 *           number of blocks to create
+	 * @param blockSize
+	 *           number of bytes in each block
+	 * @return
+	 */
+    private Holder[] generateRandomBlocks(int numBlocks, int blockSize) {
+      byte[] b = new byte[blockSize];
+      for (int x = 0; x < blockSize; x++) {
+    	b[x] = '0';
+      }
       Holder[] blocks = new Holder[numBlocks];
-      Random r = new Random();
       for (int i = 0; i < numBlocks; i++) {
-        blocks[i] = new Holder("block" + i, Integer.toString(r.nextInt((int) maxSize) + 1).getBytes(StandardCharsets.UTF_8));
+    	byte[] buf = new byte[blockSize];
+    	System.arraycopy(b, 0, buf, 0, blockSize);
+        blocks[i] = new Holder("block" + i, buf);
       }
       return blocks;
     }


Mime
View raw message