accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject [11/13] accumulo git commit: IGNITE
Date Tue, 23 May 2017 12:22:57 GMT
IGNITE


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a72c4f23
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a72c4f23
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a72c4f23

Branch: refs/heads/IGNITE
Commit: a72c4f23aadfae243d2b162ef768d8bbef1cc6d4
Parents: 0084d0b
Author: Dave Marion <dlmarion@apache.org>
Authored: Mon May 22 15:41:02 2017 -0400
Committer: Dave Marion <dlmarion@apache.org>
Committed: Mon May 22 15:41:02 2017 -0400

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +
 .../cache/tiered/TieredBlockCache.java          | 241 +++++++++---------
 .../tiered/TieredBlockCacheConfiguration.java   |  88 ++++---
 .../cache/tiered/TieredBlockCacheManager.java   | 200 ++++++++-------
 .../blockfile/cache/TestTieredBlockCache.java   | 248 ++++++++++---------
 pom.xml                                         |   5 +
 6 files changed, 435 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a72c4f23/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index f524c71..d837d79 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -68,6 +68,10 @@
       <artifactId>commons-logging</artifactId>
     </dependency>
     <dependency>
+      <groupId>javax.cache</groupId>
+      <artifactId>cache-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a72c4f23/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
index d2fe190..674115f 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import static java.util.Objects.requireNonNull;
@@ -16,118 +33,118 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TieredBlockCache implements BlockCache {
-	
-	public static final class Block implements CacheEntry {
-	  private volatile byte[] buffer;
-	  private volatile Object index;
-
-	  public Block(byte[] buffer) {
-	    this.buffer = requireNonNull(buffer);
-	  }
-
-	  @Override
-	  public byte[] getBuffer() {
-	    return buffer;
-	  }
-
-	  @Override
-	  public Object getIndex() {
-	    return index;
-	  }
-
-	    @Override
-	  public void setIndex(Object index) {
-	    this.index = index;
+
+  public static final class Block implements CacheEntry {
+    private volatile byte[] buffer;
+    private volatile Object index;
+
+    public Block(byte[] buffer) {
+      this.buffer = requireNonNull(buffer);
+    }
+
+    @Override
+    public byte[] getBuffer() {
+      return buffer;
+    }
+
+    @Override
+    public Object getIndex() {
+      return index;
+    }
+
+    @Override
+    public void setIndex(Object index) {
+      this.index = index;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class);
+  private final IgniteCache<String,Block> cache;
+  private final CacheMetrics metrics;
+  private final TieredBlockCacheConfiguration conf;
+  private final AtomicLong hitCount = new AtomicLong(0);
+  private final AtomicLong requestCount = new AtomicLong(0);
+  private final ScheduledFuture<?> future;
+
+  public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) {
+    this.conf = conf;
+    this.cache = ignite.getOrCreateCache(conf.getConfiguration());
+    metrics = cache.localMxBean();
+    LOG.info("Created {} cache with configuration {}", conf.getConfiguration().getName(),
conf.getConfiguration());
+    this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        LOG.info(cache.localMetrics().toString());
+        LOG.info(cache.getName() + " entries, on-heap: " + getOnHeapEntryCount() + ", off-heap:
" + getOffHeapEntryCount());
+      }
+    }, TieredBlockCacheManager.STAT_INTERVAL, TieredBlockCacheManager.STAT_INTERVAL, TimeUnit.SECONDS);
+  }
+
+  public void stop() {
+    this.future.cancel(false);
+    this.cache.close();
+    this.cache.destroy();
+  }
+
+  public IgniteCache<String,Block> getInternalCache() {
+    return this.cache;
+  }
+
+  public long getOnHeapEntryCount() {
+    return this.cache.localSizeLong(CachePeekMode.ONHEAP);
+  }
+
+  public long getOffHeapEntryCount() {
+    return this.cache.localSizeLong(CachePeekMode.OFFHEAP);
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
+    return cacheBlock(blockName, buf);
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buf) {
+    return this.cache.getAndPut(blockName, new Block(buf));
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName) {
+    this.requestCount.incrementAndGet();
+    Block b = this.cache.get(blockName);
+    if (null != b) {
+      this.hitCount.incrementAndGet();
+    }
+    return b;
+  }
+
+  @Override
+  public long getMaxSize() {
+    return this.conf.getMaxSize();
+  }
+
+  @Override
+  public long getMaxHeapSize() {
+    return this.conf.getMaxSize();
+  }
+
+  public CacheMetrics getCacheMetrics() {
+    return this.metrics;
+  }
+
+  @Override
+  public Stats getStats() {
+    return new Stats() {
+      @Override
+      public long hitCount() {
+        return hitCount.get();
+      }
+
+      @Override
+      public long requestCount() {
+        return requestCount.get();
       }
-	}
-
-	private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class);
-	private final IgniteCache<String, Block> cache;
-	private final CacheMetrics metrics;
-	private final TieredBlockCacheConfiguration conf;
-	private final AtomicLong hitCount = new AtomicLong(0);
-	private final AtomicLong requestCount = new AtomicLong(0);
-	private final ScheduledFuture<?> future;
-	
-	public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) {
-		this.conf = conf;
-		this.cache = ignite.getOrCreateCache(conf.getConfiguration());
-		metrics = cache.localMxBean();
-		LOG.info("Created {} cache with configuration {}", 
-				conf.getConfiguration().getName(), conf.getConfiguration());
-		this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() {
-			@Override
-			public void run() {
-				LOG.info(cache.localMetrics().toString());
-				LOG.info(cache.getName() + " entries, on-heap: " + getOnHeapEntryCount() + ", off-heap:
" + getOffHeapEntryCount());
-			}
-		}, TieredBlockCacheManager.STAT_INTERVAL, TieredBlockCacheManager.STAT_INTERVAL, TimeUnit.SECONDS);
-	}
-	
-	public void stop() {
-		this.future.cancel(false);
-		this.cache.close();
-		this.cache.destroy();
-	}
-	
-	public IgniteCache<String,Block> getInternalCache() {
-		return this.cache;
-	}
-	
-	public long getOnHeapEntryCount() {
-		return this.cache.localSizeLong(CachePeekMode.ONHEAP);
-	}
-
-	public long getOffHeapEntryCount() {
-		return this.cache.localSizeLong(CachePeekMode.OFFHEAP);
-	}
-
-	@Override
-	public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
-		return cacheBlock(blockName, buf);
-	}
-	
-	@Override
-	public CacheEntry cacheBlock(String blockName, byte[] buf) {
-		return this.cache.getAndPut(blockName, new Block(buf));
-	}
-
-	@Override
-	public CacheEntry getBlock(String blockName) {
-		this.requestCount.incrementAndGet();
-		Block b = this.cache.get(blockName);
-		if (null != b) {
-			this.hitCount.incrementAndGet();
-		}
-		return b;
-	}
-
-	@Override
-	public long getMaxSize() {
-		return this.conf.getMaxSize();
-	}
-
-	@Override
-	public long getMaxHeapSize() {
-		return this.conf.getMaxSize();
-	}
-
-	public CacheMetrics getCacheMetrics() {
-		return this.metrics;
-	}
-
-	@Override
-	public Stats getStats() {
-		return new Stats() {
-			@Override
-			public long hitCount() {
-				return hitCount.get();
-			}
-			@Override
-			public long requestCount() {
-				return requestCount.get();
-			}
-		};
-	}
+    };
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a72c4f23/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
index f90d04c..65cbaf2 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import java.util.Optional;
@@ -15,40 +32,41 @@ import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
 import org.apache.ignite.configuration.CacheConfiguration;
 
 public class TieredBlockCacheConfiguration extends BlockCacheConfiguration {
-	
-	public static final String CACHE_EXPIRATION_TIME = TieredBlockCacheManager.TIERED_PROPERTY_BASE
+ "expiration.time";
-	public static final String CACHE_EXPIRATION_TIME_UNITS = TieredBlockCacheManager.TIERED_PROPERTY_BASE
+ "expiration.time_units";
-	
-	private static final String DEFAULT_CACHE_EXPIRATION_TIME_UNITS = "HOURS";
-	private static final long DEFAULT_CACHE_EXPIRATION_TIME = 1;
-	
-	private final CacheConfiguration<String, Block> configuration;
-
-	public TieredBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) {
-	  super(conf, type, TieredBlockCacheManager.PROPERTY_PREFIX);
-
-	  String unit = Optional.ofNullable(conf.get(CACHE_EXPIRATION_TIME_UNITS)).orElse(DEFAULT_CACHE_EXPIRATION_TIME_UNITS);
-	  long time = Optional.ofNullable(conf.get(CACHE_EXPIRATION_TIME)).map(Long::valueOf).filter(f
-> f > 0).orElse(DEFAULT_CACHE_EXPIRATION_TIME);
-	  
-	  configuration = new CacheConfiguration<>();
-	  configuration.setName(type.name());
-	  configuration.setCacheMode(CacheMode.LOCAL);
-	  configuration.setOnheapCacheEnabled(true);
-	  LruEvictionPolicy<String, Block> ePolicy = new LruEvictionPolicy<>();
-	  ePolicy.setMaxSize((int) (0.75 * this.getMaxSize()));
-	  ePolicy.setMaxMemorySize(this.getMaxSize());
-	  configuration.setEvictionPolicy(ePolicy);
-	  configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.valueOf(unit),
time)));
-	  configuration.setStatisticsEnabled(true);
-	}
-
-	public CacheConfiguration<String, Block> getConfiguration() {
-	  return configuration;
-	}
-
-	@Override
-	public String toString() {
-	  return this.configuration.toString();
-	}
+
+  public static final String CACHE_EXPIRATION_TIME = TieredBlockCacheManager.TIERED_PROPERTY_BASE
+ "expiration.time";
+  public static final String CACHE_EXPIRATION_TIME_UNITS = TieredBlockCacheManager.TIERED_PROPERTY_BASE
+ "expiration.time_units";
+
+  private static final String DEFAULT_CACHE_EXPIRATION_TIME_UNITS = "HOURS";
+  private static final long DEFAULT_CACHE_EXPIRATION_TIME = 1;
+
+  private final CacheConfiguration<String,Block> configuration;
+
+  public TieredBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) {
+    super(conf, type, TieredBlockCacheManager.PROPERTY_PREFIX);
+
+    String unit = Optional.ofNullable(conf.get(CACHE_EXPIRATION_TIME_UNITS)).orElse(DEFAULT_CACHE_EXPIRATION_TIME_UNITS);
+    long time = Optional.ofNullable(conf.get(CACHE_EXPIRATION_TIME)).map(Long::valueOf).filter(f
-> f > 0).orElse(DEFAULT_CACHE_EXPIRATION_TIME);
+
+    configuration = new CacheConfiguration<>();
+    configuration.setName(type.name());
+    configuration.setCacheMode(CacheMode.LOCAL);
+    configuration.setOnheapCacheEnabled(true);
+    LruEvictionPolicy<String,Block> ePolicy = new LruEvictionPolicy<>();
+    ePolicy.setMaxSize((int) (0.75 * this.getMaxSize()));
+    ePolicy.setMaxMemorySize(this.getMaxSize());
+    configuration.setEvictionPolicy(ePolicy);
+    configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.valueOf(unit),
time)));
+    configuration.setStatisticsEnabled(true);
+    configuration.setCopyOnRead(false);
+  }
+
+  public CacheConfiguration<String,Block> getConfiguration() {
+    return configuration;
+  }
+
+  @Override
+  public String toString() {
+    return this.configuration.toString();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a72c4f23/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
index 7910327..126aa04 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -1,8 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -22,94 +40,98 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TieredBlockCacheManager extends BlockCacheManager {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCacheManager.class);
-	
-	static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new
NamingThreadFactory("TieredBlockCacheStats"));
-	static final int STAT_INTERVAL = 60;
-	
-	public static final String PROPERTY_PREFIX = "tiered";
-	public static final String TIERED_PROPERTY_BASE = BlockCacheConfiguration.CACHE_PROPERTY_BASE
+ PROPERTY_PREFIX + ".";
-	
-	public static final String OFF_HEAP_MAX_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.max.size";
-	public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.block.size";
-	
-	private static final long OFF_HEAP_MIN_SIZE = 10 * 1024 * 1024;
-	private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024;
-	private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024;
-	private static final String OFF_HEAP_CONFIG_NAME = "OFF_HEAP_MEMORY";
-
-	private Ignite IGNITE;
-	
-	@Override
-	public void start(AccumuloConfiguration conf) {
-		
-		long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
-		final int offHeapBlockSize = Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT);
-
-		if (offHeapMaxSize < OFF_HEAP_MIN_SIZE) {
-			LOG.warn("Off heap max size setting too low, overriding to minimum of 10MB");
-			offHeapMaxSize = OFF_HEAP_MIN_SIZE;
-		}
-		// Ignite configuration.
-		IgniteConfiguration cfg = new IgniteConfiguration();
-		cfg.setDaemon(true);
-		
-		// Global Off-Heap Page memory configuration.
-		MemoryConfiguration memCfg = new MemoryConfiguration();
-		memCfg.setPageSize(offHeapBlockSize);
-		
-		MemoryPolicyConfiguration plCfg = new MemoryPolicyConfiguration();
-		plCfg.setName(OFF_HEAP_CONFIG_NAME);
-		plCfg.setInitialSize(offHeapMaxSize);
-		plCfg.setMaxSize(offHeapMaxSize);
-		plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
-		plCfg.setEvictionThreshold(0.9);
-		plCfg.setEmptyPagesPoolSize((int)(offHeapMaxSize / offHeapBlockSize / 10) - 1);
-		plCfg.setMetricsEnabled(true);
-
-		memCfg.setMemoryPolicies(plCfg); //apply custom memory policy
-		memCfg.setDefaultMemoryPolicyName(OFF_HEAP_CONFIG_NAME);
-		
-		cfg.setMemoryConfiguration(memCfg); // apply off-heap memory configuration
-		LOG.info("Starting Ignite with configuration {}", cfg.toString());
-		IGNITE = Ignition.start(cfg);
-
-		super.start(conf);
-		
-		SCHEDULER.scheduleAtFixedRate(new Runnable() {
-			@Override
-			public void run() {
-				IGNITE.memoryMetrics().forEach(m -> {
-					ToStringBuilder builder = new ToStringBuilder(m);
-					builder.append("memory region name", m.getName());
-					builder.append(" page allocation rate", m.getAllocationRate());
-					builder.append(" page eviction rate", m.getEvictionRate());
-					builder.append(" total allocated pages", m.getTotalAllocatedPages());
-					builder.append(" page free space %", m.getPagesFillFactor());
-					builder.append(" large entry fragmentation %", m.getLargeEntriesPagesPercentage());
-					LOG.info(builder.toString());
-				});
-			}
-		}, STAT_INTERVAL, STAT_INTERVAL, TimeUnit.SECONDS);
-	}
-
-	@Override
-	public void stop() {
-		for (CacheType type : CacheType.values()) {
-		  TieredBlockCache cache = (TieredBlockCache) this.getBlockCache(type);
-		  if (null != cache) {
-			cache.stop();
-		  }
-		}
-		SCHEDULER.shutdownNow();
-		IGNITE.close();
-		super.stop();
-	}
-
-	@Override
-	protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) {
-		return new TieredBlockCache(new TieredBlockCacheConfiguration(conf, type), IGNITE);
-	}
+
+  private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCacheManager.class);
+
+  static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new
NamingThreadFactory("TieredBlockCacheStats"));
+  static final int STAT_INTERVAL = 60;
+
+  public static final String PROPERTY_PREFIX = "tiered";
+  public static final String TIERED_PROPERTY_BASE = BlockCacheConfiguration.CACHE_PROPERTY_BASE
+ PROPERTY_PREFIX + ".";
+
+  public static final String OFF_HEAP_MAX_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.max.size";
+  public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.block.size";
+
+  private static final long OFF_HEAP_MIN_SIZE = 10 * 1024 * 1024;
+  private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024;
+  private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024;
+  private static final String OFF_HEAP_CONFIG_NAME = "OFF_HEAP_MEMORY";
+
+  private Ignite IGNITE;
+  private ScheduledFuture<?> future;
+
+  @Override
+  public void start(AccumuloConfiguration conf) {
+
+    long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
-> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
+    final int offHeapBlockSize = Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f
-> f > 0)
+        .orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT);
+
+    if (offHeapMaxSize < OFF_HEAP_MIN_SIZE) {
+      LOG.warn("Off heap max size setting too low, overriding to minimum of 10MB");
+      offHeapMaxSize = OFF_HEAP_MIN_SIZE;
+    }
+    // Ignite configuration.
+    IgniteConfiguration cfg = new IgniteConfiguration();
+    cfg.setDaemon(true);
+
+    // Global Off-Heap Page memory configuration.
+    MemoryConfiguration memCfg = new MemoryConfiguration();
+    memCfg.setPageSize(offHeapBlockSize);
+
+    MemoryPolicyConfiguration plCfg = new MemoryPolicyConfiguration();
+    plCfg.setName(OFF_HEAP_CONFIG_NAME);
+    plCfg.setInitialSize(offHeapMaxSize);
+    plCfg.setMaxSize(offHeapMaxSize);
+    plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
+    plCfg.setEvictionThreshold(0.9);
+    plCfg.setEmptyPagesPoolSize((int) (offHeapMaxSize / offHeapBlockSize / 10) - 1);
+    plCfg.setMetricsEnabled(true);
+
+    memCfg.setMemoryPolicies(plCfg); // apply custom memory policy
+    memCfg.setDefaultMemoryPolicyName(OFF_HEAP_CONFIG_NAME);
+
+    cfg.setMemoryConfiguration(memCfg); // apply off-heap memory configuration
+    LOG.info("Starting Ignite with configuration {}", cfg.toString());
+    IGNITE = Ignition.start(cfg);
+
+    super.start(conf);
+
+    future = SCHEDULER.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        IGNITE.memoryMetrics().forEach(m -> {
+          ToStringBuilder builder = new ToStringBuilder(m);
+          builder.append("memory region name", m.getName());
+          builder.append(" page allocation rate", m.getAllocationRate());
+          builder.append(" page eviction rate", m.getEvictionRate());
+          builder.append(" total allocated pages", m.getTotalAllocatedPages());
+          builder.append(" page free space %", m.getPagesFillFactor());
+          builder.append(" large entry fragmentation %", m.getLargeEntriesPagesPercentage());
+          LOG.info(builder.toString());
+        });
+      }
+    }, STAT_INTERVAL, STAT_INTERVAL, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void stop() {
+    for (CacheType type : CacheType.values()) {
+      TieredBlockCache cache = (TieredBlockCache) this.getBlockCache(type);
+      if (null != cache) {
+        cache.stop();
+      }
+    }
+    if (null != future) {
+      future.cancel(false);
+    }
+    IGNITE.close();
+    super.stop();
+  }
+
+  @Override
+  protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) {
+    return new TieredBlockCache(new TieredBlockCacheConfiguration(conf, type), IGNITE);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a72c4f23/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
index b7a1ac8..12b65c0 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
@@ -1,131 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.accumulo.core.file.blockfile.cache;
 
-
-import javax.cache.Cache.Entry;
-
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.Block;
 import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCacheManager;
-import org.apache.ignite.cache.CachePeekMode;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestTieredBlockCache {
-	
-	private static final int BLOCKSIZE = 1024;
-	private static final long MAXSIZE = 1024*100;
-	
-	private static class Holder {
-		private final String name;
-		private final byte[] buf;
-		public Holder(String name, byte[] buf) {
-			super();
-			this.name = name;
-			this.buf = buf;
-		}
-		public String getName() {
-			return name;
-		}
-		public byte[] getBuf() {
-			return buf;
-		}
-	}
-	
-	@Test
-	public void testCacheCreation() throws Exception {
-	    DefaultConfiguration dc = new DefaultConfiguration();
-	    ConfigurationCopy cc = new ConfigurationCopy(dc);
-	    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TieredBlockCacheManager.class.getName());
-	    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
-	    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
-	    cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(MAXSIZE));
-	    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(MAXSIZE));
-	    cc.set(Property.TSERV_SUMMARYCACHE_SIZE, Long.toString(MAXSIZE));
-	    manager.start(cc);
-	    TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA);
-	    Holder[] blocks = generateRandomBlocks(100, BLOCKSIZE);
-	    // Confirm empty
-	    for (Holder h : blocks) {
-	      Assert.assertNull(cache.getBlock(h.getName()));
-	    }
-	    // Add blocks
-	    for (Holder h : blocks) {
-	      cache.cacheBlock(h.getName(), h.getBuf());
-	    }
-	    // Check if all blocks are properly cached and retrieved
-	    for (Holder h : blocks) {
-	      CacheEntry ce = cache.getBlock(h.getName());
-	      Assert.assertTrue(ce != null);
-	      Assert.assertEquals(ce.getBuffer().length, h.getBuf().length);
-	    }
-	    manager.stop();
-	}
-	
-	@Test
-	public void testOffHeapBlockMigration() throws Exception {
-	    DefaultConfiguration dc = new DefaultConfiguration();
-	    ConfigurationCopy cc = new ConfigurationCopy(dc);
-	    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TieredBlockCacheManager.class.getName());
-	    cc.set("general.custom.cache.block.tiered.off-heap.max.size", Long.toString(10*1024*1024));
-	    cc.set("general.custom.cache.block.tiered.off-heap.block.size", Long.toString(BLOCKSIZE));
-	    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
-	    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
-	    cc.set(Property.TSERV_DATACACHE_SIZE, "2048");
-	    cc.set(Property.TSERV_INDEXCACHE_SIZE, "2048");
-	    cc.set(Property.TSERV_SUMMARYCACHE_SIZE, "2048");
-	    manager.start(cc);
-	    TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA);
-	    
-	    // With this configuration we have an on-heap cache with a max size of 2K with 1K blocks
-	    // and an off heap cache with a max size of 10MB with 1K blocks
-
-	    for (Holder h : generateRandomBlocks(1, 1024)) {
-	    	cache.cacheBlock(h.getName(), h.getBuf());
-	    }
-	    Assert.assertEquals(1, cache.getCacheMetrics().getCachePuts());
-	    Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
-	    
-	    for (Holder h : generateRandomBlocks(1023, 1024)) {
-	    	cache.cacheBlock(h.getName(), h.getBuf());
-	    }
-	    Assert.assertEquals(1, cache.getOnHeapEntryCount());
-	    Assert.assertEquals(1023, cache.getOffHeapEntryCount());
-	    
-	    Assert.assertEquals(1, cache.getCacheMetrics().getSize());
-	    Assert.assertEquals(1023, cache.getCacheMetrics().getOffHeapEntriesCount());
-	    Assert.assertEquals(1024, cache.getCacheMetrics().getCachePuts());
-	    Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
-	    
-	    for (Entry<String,Block> entry : cache.getInternalCache().localEntries(CachePeekMode.ONHEAP,
CachePeekMode.PRIMARY)) {
-	    	System.out.println("on heap: " + entry.getKey());
-	    }
-	    manager.stop();
-
-	}
-
-	/**
-	 * 
-	 * @param numBlocks
-	 *           number of blocks to create
-	 * @param blockSize
-	 *           number of bytes in each block
-	 * @return
-	 */
-    private Holder[] generateRandomBlocks(int numBlocks, int blockSize) {
-      byte[] b = new byte[blockSize];
-      for (int x = 0; x < blockSize; x++) {
-    	b[x] = '0';
+
+  private static final int BLOCKSIZE = 1024;
+  private static final long MAXSIZE = 1024 * 100;
+
+  private static class Holder {
+    private final String name;
+    private final byte[] buf;
+
+    public Holder(String name, byte[] buf) {
+      super();
+      this.name = name;
+      this.buf = buf;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public byte[] getBuf() {
+      return buf;
+    }
+  }
+
+  @Test
+  public void testCacheCreation() throws Exception {
+    DefaultConfiguration dc = new DefaultConfiguration();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TieredBlockCacheManager.class.getName());
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
+    cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(MAXSIZE));
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(MAXSIZE));
+    cc.set(Property.TSERV_SUMMARYCACHE_SIZE, Long.toString(MAXSIZE));
+    manager.start(cc);
+    try {
+      TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA);
+      Holder[] blocks = generateRandomBlocks(100, BLOCKSIZE);
+      // Confirm empty
+      for (Holder h : blocks) {
+        Assert.assertNull(cache.getBlock(h.getName()));
+      }
+      // Add blocks
+      for (Holder h : blocks) {
+        cache.cacheBlock(h.getName(), h.getBuf());
       }
-      Holder[] blocks = new Holder[numBlocks];
-      for (int i = 0; i < numBlocks; i++) {
-    	byte[] buf = new byte[blockSize];
-    	System.arraycopy(b, 0, buf, 0, blockSize);
-        blocks[i] = new Holder("block" + i, buf);
+      // Check if all blocks are properly cached and retrieved
+      for (Holder h : blocks) {
+        CacheEntry ce = cache.getBlock(h.getName());
+        Assert.assertTrue(ce != null);
+        Assert.assertEquals(ce.getBuffer().length, h.getBuf().length);
       }
-      return blocks;
+    } finally {
+      manager.stop();
+    }
+  }
+
+  @Test
+  public void testOffHeapBlockMigration() throws Exception {
+    DefaultConfiguration dc = new DefaultConfiguration();
+    ConfigurationCopy cc = new ConfigurationCopy(dc);
+    cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TieredBlockCacheManager.class.getName());
+    cc.set("general.custom.cache.block.tiered.off-heap.max.size", Long.toString(10 * 1024
* 1024));
+    cc.set("general.custom.cache.block.tiered.off-heap.block.size", Long.toString(BLOCKSIZE));
+    BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+    cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
+    cc.set(Property.TSERV_DATACACHE_SIZE, "2048");
+    cc.set(Property.TSERV_INDEXCACHE_SIZE, "2048");
+    cc.set(Property.TSERV_SUMMARYCACHE_SIZE, "2048");
+    manager.start(cc);
+    try {
+      TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA);
+
+      // With this configuration we have an on-heap cache with a max size of 2K with 1K blocks
+      // and an off heap cache with a max size of 10MB with 1K blocks
+
+      for (Holder h : generateRandomBlocks(1, 1024)) {
+        cache.cacheBlock(h.getName(), h.getBuf());
+      }
+      Assert.assertEquals(1, cache.getCacheMetrics().getCachePuts());
+      Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
+
+      for (Holder h : generateRandomBlocks(1023, 1024)) {
+        cache.cacheBlock(h.getName(), h.getBuf());
+      }
+      Assert.assertEquals(1, cache.getOnHeapEntryCount());
+      Assert.assertEquals(1023, cache.getOffHeapEntryCount());
+
+      Assert.assertEquals(1, cache.getCacheMetrics().getSize());
+      Assert.assertEquals(1023, cache.getCacheMetrics().getOffHeapEntriesCount());
+      Assert.assertEquals(1024, cache.getCacheMetrics().getCachePuts());
+      Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
+
+    } finally {
+      manager.stop();
+    }
+
+  }
+
+  /**
+   *
+   * @param numBlocks
+   *          number of blocks to create
+   * @param blockSize
+   *          number of bytes in each block
+   * @return array of Holder objects
+   */
+  private Holder[] generateRandomBlocks(int numBlocks, int blockSize) {
+    byte[] b = new byte[blockSize];
+    for (int x = 0; x < blockSize; x++) {
+      b[x] = '0';
+    }
+    Holder[] blocks = new Holder[numBlocks];
+    for (int i = 0; i < numBlocks; i++) {
+      byte[] buf = new byte[blockSize];
+      System.arraycopy(b, 0, buf, 0, blockSize);
+      blocks[i] = new Holder("block" + i, buf);
     }
+    return blocks;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a72c4f23/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4a37b51..86f9591 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,6 +227,11 @@
         <version>1.1.1</version>
       </dependency>
       <dependency>
+        <groupId>javax.cache</groupId>
+        <artifactId>cache-api</artifactId>
+        <version>1.0.0</version>
+      </dependency>
+      <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>javax.servlet-api</artifactId>
         <version>3.1.0</version>


Mime
View raw message