accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: IGNITE: fixes for using BinaryObject
Date Tue, 23 May 2017 19:45:52 GMT
Repository: accumulo
Updated Branches:
  refs/heads/IGNITE d1222954a -> d29b00837


IGNITE: fixes for using BinaryObject


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d29b0083
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d29b0083
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d29b0083

Branch: refs/heads/IGNITE
Commit: d29b00837d7f9e03afb9bcb146f29667eb2735be
Parents: d122295
Author: Dave Marion <dlmarion@apache.org>
Authored: Tue May 23 15:45:37 2017 -0400
Committer: Dave Marion <dlmarion@apache.org>
Committed: Tue May 23 15:45:37 2017 -0400

----------------------------------------------------------------------
 .../cache/tiered/TieredBlockCache.java          | 96 +++++++++++++-------
 .../tiered/TieredBlockCacheConfiguration.java   |  1 +
 .../cache/tiered/TieredBlockCacheManager.java   |  9 +-
 3 files changed, 71 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d29b0083/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
index 74706ca..6086137 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import static java.util.Objects.requireNonNull;
 
+import java.lang.ref.SoftReference;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,6 +29,7 @@ import javax.cache.processor.MutableEntry;
 
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.rfile.BlockIndex;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
@@ -36,6 +38,7 @@ import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,76 +56,103 @@ public class TieredBlockCache implements BlockCache {
 
     private final IgniteCache<String,BinaryObject> cache;
 
-    private BinaryObject binary;
+    private BinaryObject binaryObject;
 
     private transient byte[] buffer;
-    private transient Object idx;
+    private transient BlockIndex idx;
 
-    private LazyBlock(BinaryObject binary, IgniteCache<String,BinaryObject> cache)
{
-      this.binary = binary;
+    private LazyBlock(BinaryObject object, IgniteCache<String,BinaryObject> cache)
{
+      this.binaryObject = object;
       this.cache = cache;
     }
 
     @Override
     public byte[] getBuffer() {
       if (null == buffer) {
-        buffer = this.binary.field(BUFFER_FIELD);
+        buffer = this.binaryObject.field(BUFFER_FIELD);
       }
       return buffer;
     }
 
     @Override
     public Object getIndex() {
-      if (null == idx) {
-        idx = this.binary.field(INDEX_FIELD);
+      if (null == idx && this.binaryObject.hasField(INDEX_FIELD)) {
+        Object o = this.binaryObject.field(INDEX_FIELD);
+        if (o instanceof BinaryObjectImpl) {
+          BinaryObjectImpl b = (BinaryObjectImpl) o;
+          idx = b.deserialize();
+        } else if (o instanceof BlockIndex) {
+          idx = (BlockIndex) o;
+        } else {
+          throw new RuntimeException("Unknown object type: " + o.getClass().getName());
+        }
       }
-      return idx;
+      return new SoftReference<BlockIndex>(idx);
     }
 
     private void resetBinary(BinaryObject binary) {
-      this.binary = binary;
+      this.binaryObject = binary;
       this.buffer = null;
       this.idx = null;
     }
 
     @Override
     public void setIndex(final Object idx) {
-      String key = this.binary.field(KEY_FIELD);
-      this.cache.invoke(key, new CacheEntryProcessor<String,BinaryObject,Void>() {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public Void process(MutableEntry<String,BinaryObject> entry, Object... arguments)
throws EntryProcessorException {
-          BinaryObjectBuilder builder = entry.getValue().toBuilder();
-          builder.setField(INDEX_FIELD, idx);
-          BinaryObject update = builder.build();
-          entry.setValue(update);
-          resetBinary(binary);
-          return null;
+      if (idx instanceof SoftReference<?>) {
+        SoftReference<?> ref = (SoftReference<?>) idx;
+        Object o = ref.get();
+        if (o == null) {
+          throw new IllegalStateException("Index is not set");
+        }
+        if (o instanceof BlockIndex) {
+          final BlockIndex bi = (BlockIndex) o;
+          final String key = this.binaryObject.field(KEY_FIELD);
+          this.cache.invoke(key, new CacheEntryProcessor<String,BinaryObject,Void>()
{
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Void process(MutableEntry<String,BinaryObject> entry, Object...
arguments) throws EntryProcessorException {
+              // final BinaryObjectBuilder builder = igniteBinary.builder(LazyBlock.class.getName());
+              // final String k = entry.getValue().field(KEY_FIELD);
+              // final byte[] b = entry.getValue().field(BUFFER_FIELD);
+              // builder.setField(KEY_FIELD, k);
+              // builder.setField(BUFFER_FIELD, b);
+              final BinaryObjectBuilder builder = entry.getValue().toBuilder();
+              builder.setField(INDEX_FIELD, bi, BlockIndex.class);
+              final BinaryObject update = builder.build();
+              entry.setValue(update);
+              resetBinary(update);
+              return null;
+            }
+          });
+        } else {
+          throw new UnsupportedOperationException("Object is not a BlockIndex, is a: " +
o.getClass().getName());
         }
-      });
+      } else {
+        throw new UnsupportedOperationException("Unhandled object type");
+      }
     }
 
     /**
      * Convert a BinaryObject to a LazyBlock. Use this method instead of BinaryObject.deserialize
as it sets the reference to the IgniteCache instance to use
      * when setIndex is invoked
      *
-     * @param binary
+     * @param object
      *          {@link BinaryObject} representing this LazyBlock
      * @param cache
      *          reference to the {@link IgniteCache} whence the {@link BinaryObject} is stored
      * @return instance of {@link LazyBlock}
      */
-    static LazyBlock fromIgniteBinaryObject(BinaryObject binary, IgniteCache<String,BinaryObject>
cache) {
-      requireNonNull(binary);
+    static LazyBlock fromIgniteBinaryObject(BinaryObject object, IgniteCache<String,BinaryObject>
cache) {
+      requireNonNull(object);
       requireNonNull(cache);
-      return new LazyBlock(binary, cache);
+      return new LazyBlock(object, cache);
     }
 
     /**
      * Convert a byte array representing a block to a binary object to put into the ignite
cache
      *
-     * @param binary
+     * @param igniteBinary
      *          binary interface for {@link Ignite}
      * @param key
      *          they key that this block will be stored under in the cache
@@ -130,11 +160,11 @@ public class TieredBlockCache implements BlockCache {
      *          the byte array representing the block
      * @return {@link BinaryObject} instance
      */
-    static BinaryObject toIgniteBinaryObject(IgniteBinary binary, String key, byte[] block)
{
+    static BinaryObject toIgniteBinaryObject(IgniteBinary igniteBinary, String key, byte[]
block) {
       requireNonNull(key);
       requireNonNull(block);
-      requireNonNull(binary);
-      BinaryObjectBuilder builder = binary.builder(LazyBlock.class.getName());
+      requireNonNull(igniteBinary);
+      BinaryObjectBuilder builder = igniteBinary.builder(LazyBlock.class.getName());
       builder.setField(KEY_FIELD, key);
       builder.setField(BUFFER_FIELD, block);
       return builder.build();
@@ -144,7 +174,7 @@ public class TieredBlockCache implements BlockCache {
 
   private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class);
   private final IgniteCache<String,BinaryObject> cache;
-  private final IgniteBinary binary;
+  private final IgniteBinary igniteBinary;
   private final CacheMetrics metrics;
   private final TieredBlockCacheConfiguration conf;
   private final AtomicLong hitCount = new AtomicLong(0);
@@ -154,7 +184,7 @@ public class TieredBlockCache implements BlockCache {
   public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) {
     this.conf = conf;
     this.cache = ignite.getOrCreateCache(conf.getConfiguration()).withKeepBinary();
-    this.binary = ignite.binary();
+    this.igniteBinary = ignite.binary();
     metrics = cache.localMxBean();
     LOG.info("Created {} cache with configuration {}", conf.getConfiguration().getName(),
conf.getConfiguration());
     this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() {
@@ -191,7 +221,7 @@ public class TieredBlockCache implements BlockCache {
 
   @Override
   public CacheEntry cacheBlock(String blockName, byte[] buf) {
-    BinaryObject bo = this.cache.getAndPut(blockName, LazyBlock.toIgniteBinaryObject(binary,
blockName, buf));
+    BinaryObject bo = this.cache.getAndPut(blockName, LazyBlock.toIgniteBinaryObject(igniteBinary,
blockName, buf));
     if (null != bo) {
       return LazyBlock.fromIgniteBinaryObject(bo, this.cache);
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d29b0083/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
index 382b2db..039bc85 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -58,6 +58,7 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration
{
     configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.valueOf(unit),
time)));
     configuration.setStatisticsEnabled(true);
     configuration.setCopyOnRead(false);
+    configuration.setStoreKeepBinary(true);
   }
 
   public CacheConfiguration<String,BinaryObject> getConfiguration() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d29b0083/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
index 437c0e8..7e9b9e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -17,7 +17,8 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -30,6 +31,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.cache.CacheType;
 import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.LazyBlock;
+import org.apache.accumulo.core.file.rfile.BlockIndex;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.ignite.Ignite;
@@ -82,7 +84,10 @@ public class TieredBlockCacheManager extends BlockCacheManager {
     cfg.setDaemon(true);
 
     BinaryConfiguration binaryCfg = new BinaryConfiguration();
-    binaryCfg.setClassNames(Collections.singleton(LazyBlock.class.getName()));
+    List<String> classes = new ArrayList<>();
+    classes.add(LazyBlock.class.getName());
+    classes.add(BlockIndex.class.getName());
+    binaryCfg.setClassNames(classes);
     cfg.setBinaryConfiguration(binaryCfg);
 
     // Global Off-Heap Page memory configuration.


Mime
View raw message