accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: IGNITE : use BinaryObject and lazy serialization
Date Tue, 23 May 2017 17:30:29 GMT
Repository: accumulo
Updated Branches:
  refs/heads/IGNITE eb6cb1424 -> d1222954a


IGNITE : use BinaryObject and lazy serialization


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d1222954
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d1222954
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d1222954

Branch: refs/heads/IGNITE
Commit: d1222954aa6749ce8e24b248b30df3e113c99a2d
Parents: eb6cb14
Author: Dave Marion <dlmarion@apache.org>
Authored: Tue May 23 13:30:09 2017 -0400
Committer: Dave Marion <dlmarion@apache.org>
Committed: Tue May 23 13:30:09 2017 -0400

----------------------------------------------------------------------
 .../cache/tiered/TieredBlockCache.java          | 122 ++++++++++++++++---
 .../tiered/TieredBlockCacheConfiguration.java   |   8 +-
 .../cache/tiered/TieredBlockCacheManager.java   |  10 ++
 3 files changed, 121 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1222954/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
index 89e984d..74706ca 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -23,10 +23,17 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePeekMode;
 import org.slf4j.Logger;
@@ -34,32 +41,110 @@ import org.slf4j.LoggerFactory;
 
 public class TieredBlockCache implements BlockCache {
 
-  static final class Block implements CacheEntry {
-    private byte[] buffer;
-    private volatile Object index;
+  /**
+   * CacheEntry implementation that performs lazy deserialization based on an Ignite BinaryObject
+   *
+   */
+  static final class LazyBlock implements CacheEntry {
+
+    private static final String KEY_FIELD = "key";
+    private static final String BUFFER_FIELD = "buffer";
+    private static final String INDEX_FIELD = "index";
+
+    private final IgniteCache<String,BinaryObject> cache;
+
+    private BinaryObject binary;
 
-    public Block(byte[] buffer) {
-      this.buffer = requireNonNull(buffer);
+    private transient byte[] buffer;
+    private transient Object idx;
+
+    private LazyBlock(BinaryObject binary, IgniteCache<String,BinaryObject> cache)
{
+      this.binary = binary;
+      this.cache = cache;
     }
 
     @Override
     public byte[] getBuffer() {
+      if (null == buffer) {
+        buffer = this.binary.field(BUFFER_FIELD);
+      }
       return buffer;
     }
 
     @Override
     public Object getIndex() {
-      return index;
+      if (null == idx) {
+        idx = this.binary.field(INDEX_FIELD);
+      }
+      return idx;
+    }
+
+    private void resetBinary(BinaryObject binary) {
+      this.binary = binary;
+      this.buffer = null;
+      this.idx = null;
     }
 
     @Override
-    public void setIndex(Object index) {
-      this.index = index;
+    public void setIndex(final Object idx) {
+      String key = this.binary.field(KEY_FIELD);
+      this.cache.invoke(key, new CacheEntryProcessor<String,BinaryObject,Void>() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Void process(MutableEntry<String,BinaryObject> entry, Object... arguments)
throws EntryProcessorException {
+          BinaryObjectBuilder builder = entry.getValue().toBuilder();
+          builder.setField(INDEX_FIELD, idx);
+          BinaryObject update = builder.build();
+          entry.setValue(update);
+          resetBinary(binary);
+          return null;
+        }
+      });
+    }
+
+    /**
+     * Convert a BinaryObject to a LazyBlock. Use this method instead of BinaryObject.deserialize
as it sets the reference to the IgniteCache instance to use
+     * when setIndex is invoked
+     *
+     * @param binary
+     *          {@link BinaryObject} representing this LazyBlock
+     * @param cache
+     *          reference to the {@link IgniteCache} whence the {@link BinaryObject} is stored
+     * @return instance of {@link LazyBlock}
+     */
+    static LazyBlock fromIgniteBinaryObject(BinaryObject binary, IgniteCache<String,BinaryObject>
cache) {
+      requireNonNull(binary);
+      requireNonNull(cache);
+      return new LazyBlock(binary, cache);
     }
+
+    /**
+     * Convert a byte array representing a block to a binary object to put into the ignite
cache
+     *
+     * @param binary
+     *          binary interface for {@link Ignite}
+     * @param key
+     *          they key that this block will be stored under in the cache
+     * @param block
+     *          the byte array representing the block
+     * @return {@link BinaryObject} instance
+     */
+    static BinaryObject toIgniteBinaryObject(IgniteBinary binary, String key, byte[] block)
{
+      requireNonNull(key);
+      requireNonNull(block);
+      requireNonNull(binary);
+      BinaryObjectBuilder builder = binary.builder(LazyBlock.class.getName());
+      builder.setField(KEY_FIELD, key);
+      builder.setField(BUFFER_FIELD, block);
+      return builder.build();
+    }
+
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class);
-  private final IgniteCache<String,Block> cache;
+  private final IgniteCache<String,BinaryObject> cache;
+  private final IgniteBinary binary;
   private final CacheMetrics metrics;
   private final TieredBlockCacheConfiguration conf;
   private final AtomicLong hitCount = new AtomicLong(0);
@@ -68,7 +153,8 @@ public class TieredBlockCache implements BlockCache {
 
   public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) {
     this.conf = conf;
-    this.cache = ignite.getOrCreateCache(conf.getConfiguration());
+    this.cache = ignite.getOrCreateCache(conf.getConfiguration()).withKeepBinary();
+    this.binary = ignite.binary();
     metrics = cache.localMxBean();
     LOG.info("Created {} cache with configuration {}", conf.getConfiguration().getName(),
conf.getConfiguration());
     this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() {
@@ -86,7 +172,7 @@ public class TieredBlockCache implements BlockCache {
     this.cache.destroy();
   }
 
-  public IgniteCache<String,Block> getInternalCache() {
+  public IgniteCache<String,BinaryObject> getInternalCache() {
     return this.cache;
   }
 
@@ -105,17 +191,23 @@ public class TieredBlockCache implements BlockCache {
 
   @Override
   public CacheEntry cacheBlock(String blockName, byte[] buf) {
-    return this.cache.getAndPut(blockName, new Block(buf));
+    BinaryObject bo = this.cache.getAndPut(blockName, LazyBlock.toIgniteBinaryObject(binary,
blockName, buf));
+    if (null != bo) {
+      return LazyBlock.fromIgniteBinaryObject(bo, this.cache);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public CacheEntry getBlock(String blockName) {
     this.requestCount.incrementAndGet();
-    Block b = this.cache.get(blockName);
-    if (null != b) {
+    BinaryObject bo = this.cache.get(blockName);
+    if (null != bo) {
       this.hitCount.incrementAndGet();
+      return LazyBlock.fromIgniteBinaryObject(bo, this.cache);
     }
-    return b;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1222954/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
index 65cbaf2..382b2db 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -26,7 +26,7 @@ import javax.cache.expiry.Duration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.CacheType;
-import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.Block;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -39,7 +39,7 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration
{
   private static final String DEFAULT_CACHE_EXPIRATION_TIME_UNITS = "HOURS";
   private static final long DEFAULT_CACHE_EXPIRATION_TIME = 1;
 
-  private final CacheConfiguration<String,Block> configuration;
+  private final CacheConfiguration<String,BinaryObject> configuration;
 
   public TieredBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) {
     super(conf, type, TieredBlockCacheManager.PROPERTY_PREFIX);
@@ -51,7 +51,7 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration
{
     configuration.setName(type.name());
     configuration.setCacheMode(CacheMode.LOCAL);
     configuration.setOnheapCacheEnabled(true);
-    LruEvictionPolicy<String,Block> ePolicy = new LruEvictionPolicy<>();
+    LruEvictionPolicy<String,BinaryObject> ePolicy = new LruEvictionPolicy<>();
     ePolicy.setMaxSize((int) (0.75 * this.getMaxSize()));
     ePolicy.setMaxMemorySize(this.getMaxSize());
     configuration.setEvictionPolicy(ePolicy);
@@ -60,7 +60,7 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration
{
     configuration.setCopyOnRead(false);
   }
 
-  public CacheConfiguration<String,Block> getConfiguration() {
+  public CacheConfiguration<String,BinaryObject> getConfiguration() {
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1222954/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
index 126aa04..437c0e8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,10 +29,12 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.LazyBlock;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
@@ -39,6 +42,9 @@ import org.apache.ignite.configuration.MemoryPolicyConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * BlockCacheManager implementation that creates a {@link BlockCache} that uses on-heap and
off-heap memory areas using Apache Ignite.
+ */
 public class TieredBlockCacheManager extends BlockCacheManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCacheManager.class);
@@ -75,6 +81,10 @@ public class TieredBlockCacheManager extends BlockCacheManager {
     IgniteConfiguration cfg = new IgniteConfiguration();
     cfg.setDaemon(true);
 
+    BinaryConfiguration binaryCfg = new BinaryConfiguration();
+    binaryCfg.setClassNames(Collections.singleton(LazyBlock.class.getName()));
+    cfg.setBinaryConfiguration(binaryCfg);
+
     // Global Off-Heap Page memory configuration.
     MemoryConfiguration memCfg = new MemoryConfiguration();
     memCfg.setPageSize(offHeapBlockSize);


Mime
View raw message