accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: Avoid multiple threads loading same cache block (#990)
Date Tue, 28 May 2019 19:15:24 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 057f14e  Avoid multiple threads loading same cache block (#990)
057f14e is described below

commit 057f14e23764bc72d0a3e3a85de03632e3b45b0a
Author: Keith Turner <kturner@apache.org>
AuthorDate: Mon Feb 25 18:44:18 2019 -0500

    Avoid multiple threads loading same cache block (#990)
---
 .../accumulo/core/client/rfile/RFileScanner.java   | 25 ++++++++-
 .../core/file/blockfile/cache/BlockCache.java      | 12 +++++
 .../core/file/blockfile/cache/LruBlockCache.java   | 60 ++++++++++++++++++----
 .../file/blockfile/impl/CachableBlockFile.java     | 37 +++++++++----
 .../file/blockfile/cache/TestLruBlockCache.java    | 60 +++++++++++-----------
 5 files changed, 144 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 01d571d..4096299 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.client.rfile;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -27,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
@@ -45,6 +47,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache.Options;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
@@ -76,6 +79,14 @@ class RFileScanner extends ScannerOptions implements Scanner {
   private static final long CACHE_BLOCK_SIZE = AccumuloConfiguration.getDefaultConfiguration()
       .getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
 
+  private static final EnumSet<Options> CACHE_OPTS;
+
+  static {
+    EnumSet<Options> cacheOpts = EnumSet.allOf(Options.class);
+    cacheOpts.remove(Options.ENABLE_LOCKS);
+    CACHE_OPTS = cacheOpts;
+  }
+
   static class Opts {
     InputArgs in;
     Authorizations auths = Authorizations.EMPTY;
@@ -110,9 +121,19 @@ class RFileScanner extends ScannerOptions implements Scanner {
     }
 
     @Override
+    public CacheEntry getBlockNoStats(String blockName) {
+      return null;
+    }
+
+    @Override
     public long getMaxSize() {
       return Integer.MAX_VALUE;
     }
+
+    @Override
+    public Lock getLoadLock(String blockName) {
+      return null;
+    }
   }
 
   RFileScanner(Opts opts) {
@@ -123,13 +144,13 @@ class RFileScanner extends ScannerOptions implements Scanner {
 
     this.opts = opts;
     if (opts.indexCacheSize > 0) {
-      this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE);
+      this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE, CACHE_OPTS);
     } else {
       this.indexCache = new NoopCache();
     }
 
     if (opts.dataCacheSize > 0) {
-      this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE);
+      this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE, CACHE_OPTS);
     } else {
       this.dataCache = new NoopCache();
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index 094782d..75a0bbc 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -17,6 +17,8 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
+import java.util.concurrent.locks.Lock;
+
 /**
  * Block cache interface.
  */
@@ -52,10 +54,20 @@ public interface BlockCache {
    */
   CacheEntry getBlock(String blockName);
 
+  CacheEntry getBlockNoStats(String blockName);
+
   /**
    * Get the maximum size of this cache.
    *
    * @return max size in bytes
    */
   long getMaxSize();
+
+  /**
+   * Return a lock used for loading data not in the cache. Should always return the same
lock for
+   * the same block name. Can return different locks for different block names. Its ok to
return
+   * null if locking is not desired.
+   */
+  Lock getLoadLock(String blockName);
+
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
index 610c16a..e971603 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
@@ -18,6 +18,7 @@
 package org.apache.accumulo.core.file.blockfile.cache;
 
 import java.lang.ref.WeakReference;
+import java.util.EnumSet;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.accumulo.core.util.NamingThreadFactory;
@@ -143,6 +145,13 @@ public class LruBlockCache implements BlockCache, HeapSize {
   /** Overhead of the structure itself */
   private long overhead;
 
+  /** Locks used when loading data not in the cache. */
+  private final Lock[] loadLocks;
+
+  public enum Options {
+    ENABLE_EVICTION, ENABLE_LOCKS
+  }
+
   /**
    * Default constructor. Specify maximum size and expected average block size (approximation
is
    * fine).
@@ -156,17 +165,16 @@ public class LruBlockCache implements BlockCache, HeapSize {
    *          approximate size of each block, in bytes
    */
   public LruBlockCache(long maxSize, long blockSize) {
-    this(maxSize, blockSize, true);
+    this(maxSize, blockSize, EnumSet.allOf(Options.class));
   }
 
   /**
    * Constructor used for testing. Allows disabling of the eviction thread.
    */
-  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
-    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize),
-        DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR,
-        DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR,
-        DEFAULT_MEMORY_FACTOR);
+  public LruBlockCache(long maxSize, long blockSize, EnumSet<Options> opts) {
+    this(maxSize, blockSize, opts, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR,
+        DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
+        DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR);
   }
 
   /**
@@ -176,8 +184,8 @@ public class LruBlockCache implements BlockCache, HeapSize {
    *          maximum size of this cache, in bytes
    * @param blockSize
    *          expected average size of blocks, in bytes
-   * @param evictionThread
-   *          whether to run evictions in a bg thread or not
+   * @param opts
+   *          boolean options
    * @param mapInitialSize
    *          initial size of backing ConcurrentHashMap
    * @param mapLoadFactor
@@ -195,7 +203,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
    * @param memoryFactor
    *          percentage of total size for in-memory blocks
    */
-  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize,
+  public LruBlockCache(long maxSize, long blockSize, EnumSet<Options> opts, int mapInitialSize,
       float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor,
       float singleFactor, float multiFactor, float memoryFactor) {
     if (singleFactor + multiFactor + memoryFactor != 1) {
@@ -222,7 +230,19 @@ public class LruBlockCache implements BlockCache, HeapSize {
     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
     this.size = new AtomicLong(this.overhead);
 
-    if (evictionThread) {
+    if (opts.contains(Options.ENABLE_LOCKS)) {
+      // The size of the array is intentionally a prime number. Using a prime improves the
chance of
+      // getting a more even spread for the later modulus. The size was also chosen to allow
a lot
+      // of concurrency.
+      loadLocks = new Lock[5003];
+      for (int i = 0; i < loadLocks.length; i++) {
+        loadLocks[i] = new ReentrantLock(false);
+      }
+    } else {
+      loadLocks = null;
+    }
+
+    if (opts.contains(Options.ENABLE_EVICTION)) {
       this.evictionThread = new EvictionThread(this);
       this.evictionThread.start();
       while (!this.evictionThread.running()) {
@@ -324,6 +344,16 @@ public class LruBlockCache implements BlockCache, HeapSize {
     return cb;
   }
 
+  @Override
+  public CachedBlock getBlockNoStats(String blockName) {
+    CachedBlock cb = map.get(blockName);
+    if (cb == null) {
+      return null;
+    }
+    cb.access(count.incrementAndGet());
+    return cb;
+  }
+
   protected long evictBlock(CachedBlock block) {
     map.remove(block.getName());
     size.addAndGet(-1 * block.heapSize());
@@ -728,4 +758,14 @@ public class LruBlockCache implements BlockCache, HeapSize {
   public void shutdown() {
     this.scheduleThreadPool.shutdown();
   }
+
+  @Override
+  public Lock getLoadLock(String blockName) {
+    if (loadLocks == null)
+      return null;
+
+    // Would rather use Guava Striped, but its @Beta
+    int index = Math.abs(blockName.hashCode() % loadLocks.length);
+    return loadLocks[index];
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 336ec4d..ade98d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.lang.ref.SoftReference;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.file.blockfile.ABlockReader;
@@ -371,6 +372,7 @@ public class CachableBlockFile {
 
       BlockReader _currBlock;
 
+      Lock loadLock = null;
       if (cache != null) {
         CacheEntry cb = null;
         cb = cache.getBlock(_lookup);
@@ -379,18 +381,35 @@ public class CachableBlockFile {
           return new CachedBlockRead(cb, cb.getBuffer());
         }
 
+        loadLock = cache.getLoadLock(_lookup);
       }
-      /**
-       * grab the currBlock at this point the block is still in the data stream
-       *
-       */
-      _currBlock = loader.get();
 
-      /**
-       * If the block is bigger than the cache just return the stream
-       */
-      return cacheBlock(_lookup, cache, _currBlock, loader.getInfo());
+      try {
+        if (loadLock != null) {
+          loadLock.lock();
+
+          // check cache again after getting lock
+          CacheEntry cb = cache.getBlockNoStats(_lookup);
+
+          if (cb != null) {
+            return new CachedBlockRead(cb, cb.getBuffer());
+          }
+        }
 
+        /**
+         * grab the currBlock at this point the block is still in the data stream
+         *
+         */
+        _currBlock = loader.get();
+
+        /**
+         * If the block is bigger than the cache just return the stream
+         */
+        return cacheBlock(_lookup, cache, _currBlock, loader.getInfo());
+      } finally {
+        if (loadLock != null)
+          loadLock.unlock();
+      }
     }
 
     private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock,
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
index 690c3cb..dbba030 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Random;
 
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache.Options;
 import org.junit.Test;
 
 /**
@@ -115,7 +117,7 @@ public class TestLruBlockCache {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false);
+    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, EnumSet.noneOf(Options.class));
 
     Block[] blocks = generateFixedBlocks(10, blockSize, "block");
 
@@ -153,13 +155,13 @@ public class TestLruBlockCache {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 10);
 
-    LruBlockCache cache =
-        new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
-            LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f,
// min
-            0.99f, // acceptable
-            0.25f, // single
-            0.50f, // multi
-            0.25f);// memory
+    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, EnumSet.noneOf(Options.class),
+        (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
+        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
+        0.99f, // acceptable
+        0.25f, // single
+        0.50f, // multi
+        0.25f);// memory
 
     Block[] singleBlocks = generateFixedBlocks(5, 10000, "single");
     Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi");
@@ -216,13 +218,13 @@ public class TestLruBlockCache {
     long maxSize = 100000;
     long blockSize = calculateBlockSize(maxSize, 10);
 
-    LruBlockCache cache =
-        new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
-            LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f,
// min
-            0.99f, // acceptable
-            0.33f, // single
-            0.33f, // multi
-            0.34f);// memory
+    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, EnumSet.noneOf(Options.class),
+        (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
+        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
+        0.99f, // acceptable
+        0.33f, // single
+        0.33f, // multi
+        0.34f);// memory
 
     Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
@@ -336,13 +338,13 @@ public class TestLruBlockCache {
     long maxSize = 100000;
     long blockSize = calculateBlockSize(maxSize, 10);
 
-    LruBlockCache cache =
-        new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
-            LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.66f,
// min
-            0.99f, // acceptable
-            0.33f, // single
-            0.33f, // multi
-            0.34f);// memory
+    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, EnumSet.noneOf(Options.class),
+        (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
+        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.66f, // min
+        0.99f, // acceptable
+        0.33f, // single
+        0.33f, // multi
+        0.34f);// memory
 
     Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
@@ -397,13 +399,13 @@ public class TestLruBlockCache {
     long maxSize = 300000;
     long blockSize = calculateBlockSize(maxSize, 31);
 
-    LruBlockCache cache =
-        new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
-            LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f,
// min
-            0.99f, // acceptable
-            0.33f, // single
-            0.33f, // multi
-            0.34f);// memory
+    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, EnumSet.noneOf(Options.class),
+        (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
+        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
+        0.99f, // acceptable
+        0.33f, // single
+        0.33f, // multi
+        0.34f);// memory
 
     Block[] singleBlocks = generateFixedBlocks(10, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(10, blockSize, "multi");


Mime
View raw message