accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: Branch off ACCUMULO-4463 to start working on Apache Ignite block cache
Date Fri, 12 May 2017 20:09:52 GMT
Repository: accumulo
Updated Branches:
  refs/heads/IGNITE [created] 2e992c889


Branch off ACCUMULO-4463 to start working on Apache Ignite block cache


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2e992c88
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2e992c88
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2e992c88

Branch: refs/heads/IGNITE
Commit: 2e992c8896d71921513102d294179f4782b51dc2
Parents: 61dd036
Author: Dave Marion <dlmarion@apache.org>
Authored: Fri May 12 16:09:22 2017 -0400
Committer: Dave Marion <dlmarion@apache.org>
Committed: Fri May 12 16:09:22 2017 -0400

----------------------------------------------------------------------
 core/pom.xml                                    |  4 +
 .../cache/tiered/TieredBlockCache.java          | 97 ++++++++++++++++++++
 .../tiered/TieredBlockCacheConfiguration.java   | 40 ++++++++
 .../cache/tiered/TieredBlockCacheManager.java   | 79 ++++++++++++++++
 .../blockfile/cache/TestTieredBlockCache.java   | 75 +++++++++++++++
 pom.xml                                         |  6 ++
 6 files changed, 301 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index aec7f66..f524c71 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -96,6 +96,10 @@
       <artifactId>htrace-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
new file mode 100644
index 0000000..6031666
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -0,0 +1,97 @@
+package org.apache.accumulo.core.file.blockfile.cache.tiered;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TieredBlockCache implements BlockCache {
+	
+	public static final class Block implements CacheEntry {
+	  private volatile byte[] buffer;
+	  private volatile Object index;
+
+	  public Block(byte[] buffer) {
+	    this.buffer = requireNonNull(buffer);
+	  }
+
+	  @Override
+	  public byte[] getBuffer() {
+	    return buffer;
+	  }
+
+	  @Override
+	  public Object getIndex() {
+	    return index;
+	  }
+
+	    @Override
+	  public void setIndex(Object index) {
+	    this.index = index;
+      }
+	}
+
+	private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class);
+	private final IgniteCache<String, Block> cache;
+	private final TieredBlockCacheConfiguration conf;
+	private final AtomicLong hitCount = new AtomicLong(0);
+	private final AtomicLong requestCount = new AtomicLong(0);
+
+	
+	public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) {
+		this.conf = conf;
+		this.cache = ignite.getOrCreateCache(conf.getConfiguration());
+		LOG.info("Created {} cache with configuration {}", 
+				conf.getConfiguration().getName(), conf.getConfiguration());
+	}
+	
+	public void stop() {
+		this.cache.close();
+	}
+	
+	@Override
+	public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
+		return cacheBlock(blockName, buf);
+	}
+	
+	@Override
+	public CacheEntry cacheBlock(String blockName, byte[] buf) {
+		return this.cache.getAndPutIfAbsent(blockName, new Block(buf));
+	}
+
+	@Override
+	public CacheEntry getBlock(String blockName) {
+		this.requestCount.incrementAndGet();
+		Block b = this.cache.get(blockName);
+		if (null != b) {
+			this.hitCount.incrementAndGet();
+		}
+		return b;
+	}
+
+	@Override
+	public long getMaxSize() {
+		return this.conf.getMaxSize();
+	}
+
+	@Override
+	public Stats getStats() {
+		return new Stats() {
+			@Override
+			public long hitCount() {
+				return hitCount.get();
+			}
+			@Override
+			public long requestCount() {
+				return requestCount.get();
+			}
+		};
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
new file mode 100644
index 0000000..5f2cde5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -0,0 +1,40 @@
+package org.apache.accumulo.core.file.blockfile.cache.tiered;
+
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.Block;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+public class TieredBlockCacheConfiguration extends BlockCacheConfiguration {
+	
+	private final CacheConfiguration<String, Block> configuration;
+
+	public TieredBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) {
+	  super(conf, type, TieredBlockCacheManager.PROPERTY_PREFIX);
+
+	  configuration = new CacheConfiguration<>();
+	  configuration.setName(type.name());
+	  configuration.setCacheMode(CacheMode.LOCAL);
+	  configuration.setOnheapCacheEnabled(true);
+	  configuration.setEvictionPolicy(new LruEvictionPolicy<String, Block>((int) this.getMaxSize()));
+	  configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(Duration.ONE_HOUR));
+	}
+
+	public CacheConfiguration<String, Block> getConfiguration() {
+	  return configuration;
+	}
+
+	@Override
+	public String toString() {
+	  return this.configuration.toString();
+	}
+	
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
new file mode 100644
index 0000000..5ea8a80
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -0,0 +1,79 @@
+package org.apache.accumulo.core.file.blockfile.cache.tiered;
+
+import java.util.Optional;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TieredBlockCacheManager extends BlockCacheManager {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCacheManager.class);
+	
+	public static final String PROPERTY_PREFIX = "tiered";
+	
+	private static final String TIERED_PROPERTY_BASE = BlockCacheManager.CACHE_PROPERTY_BASE
+ PROPERTY_PREFIX + ".";
+	public static final String OFF_HEAP_MAX_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.max.size";
+	public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.block.size";
+	
+	private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024;
+	private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024;
+
+	private Ignite IGNITE;
+	
+	@Override
+	public void start(AccumuloConfiguration conf) {
+		
+		final long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
+		final int offHeapBlockSize = Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT);
+
+		// Ignite configuration.
+		IgniteConfiguration cfg = new IgniteConfiguration();
+		cfg.setDaemon(true);
+		
+		// Global Off-Heap Page memory configuration.
+		MemoryConfiguration memCfg = new MemoryConfiguration();
+		memCfg.setPageSize(offHeapBlockSize);
+		
+		MemoryPolicyConfiguration plCfg = new MemoryPolicyConfiguration();
+		plCfg.setInitialSize(offHeapMaxSize);
+		plCfg.setMaxSize(offHeapMaxSize);
+		plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
+		plCfg.setEvictionThreshold(0.9);
+
+		memCfg.setMemoryPolicies(plCfg); //apply custom memory policy
+		
+		cfg.setMemoryConfiguration(memCfg); // apply off-heap memory configuration
+		LOG.info("Starting Ignite with configuration {}", cfg.toString());
+		IGNITE = Ignition.start(cfg);
+
+		super.start(conf);
+	}
+
+	@Override
+	public void stop() {
+		for (CacheType type : CacheType.values()) {
+		  TieredBlockCache cache = (TieredBlockCache) this.getBlockCache(type);
+		  if (null != cache) {
+			cache.stop();
+		  }
+		}
+		IGNITE.close();
+		super.stop();
+	}
+
+	@Override
+	protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) {
+		return new TieredBlockCache(new TieredBlockCacheConfiguration(conf, type), IGNITE);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
new file mode 100644
index 0000000..3149438
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
@@ -0,0 +1,75 @@
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCacheManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTieredBlockCache {
+	
+	private static final long BLOCKSIZE = 1024;
+	private static final long MAXSIZE = 1024*100;
+	
+	private static class Holder {
+		private final String name;
+		private final byte[] buf;
+		public Holder(String name, byte[] buf) {
+			super();
+			this.name = name;
+			this.buf = buf;
+		}
+		public String getName() {
+			return name;
+		}
+		public byte[] getBuf() {
+			return buf;
+		}
+	}
+	
+	@Test
+	public void testCacheCreation() throws Exception {
+	    DefaultConfiguration dc = new DefaultConfiguration();
+	    ConfigurationCopy cc = new ConfigurationCopy(dc);
+	    cc.set(Property.TSERV_CACHE_FACTORY_IMPL, TieredBlockCacheManager.class.getName());
+	    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+	    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
+	    cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(MAXSIZE));
+	    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(MAXSIZE));
+	    cc.set(Property.TSERV_SUMMARYCACHE_SIZE, Long.toString(MAXSIZE));
+	    manager.start(cc);
+	    TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA);
+	    Holder[] blocks = generateRandomBlocks(100, BLOCKSIZE);
+	    // Confirm empty
+	    for (Holder h : blocks) {
+	      Assert.assertNull(cache.getBlock(h.getName()));
+	    }
+	    // Add blocks
+	    for (Holder h : blocks) {
+	      cache.cacheBlock(h.getName(), h.getBuf());
+	    }
+	    // Check if all blocks are properly cached and retrieved
+	    for (Holder h : blocks) {
+	      CacheEntry ce = cache.getBlock(h.getName());
+	      Assert.assertTrue(ce != null);
+	      Assert.assertEquals(ce.getBuffer().length, h.getBuf().length);
+	    }
+
+	    manager.stop();
+	}
+	
+    private Holder[] generateRandomBlocks(int numBlocks, long maxSize) {
+      Holder[] blocks = new Holder[numBlocks];
+      Random r = new Random();
+      for (int i = 0; i < numBlocks; i++) {
+        blocks[i] = new Holder("block" + i, Integer.toString(r.nextInt((int) maxSize) + 1).getBytes(StandardCharsets.UTF_8));
+      }
+      return blocks;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 78a5712..4a37b51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
     <hadoop.version>2.6.4</hadoop.version>
     <htrace.version>3.1.0-incubating</htrace.version>
     <httpclient.version>4.3.1</httpclient.version>
+    <ignite.version>2.0.0</ignite.version>
     <it.failIfNoSpecifiedTests>false</it.failIfNoSpecifiedTests>
     <!-- jetty 9.2 is the last version to support jdk less than 1.8 -->
     <jetty.version>9.2.17.v20160517</jetty.version>
@@ -432,6 +433,11 @@
         <version>${httpclient.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-core</artifactId>
+        <version>${ignite.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.maven</groupId>
         <artifactId>maven-artifact</artifactId>
         <version>${maven.min-version}</version>


Mime
View raw message