hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-14463 Severe performance downgrade when parallel reading a single key from BucketCache (Yu Li)
Date Sat, 07 Nov 2015 01:34:21 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 234f31bd4 -> 61e2566c1


HBASE-14463 Severe performance downgrade when parallel reading a single key from BucketCache
(Yu Li)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61e2566c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61e2566c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61e2566c

Branch: refs/heads/branch-1
Commit: 61e2566c1cd9a83aa4e2bbb016d613ced01d7a6b
Parents: 234f31b
Author: tedyu <yuzhihong@gmail.com>
Authored: Fri Nov 6 17:34:14 2015 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Fri Nov 6 17:34:14 2015 -0800

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketCache.java      |  41 +++---
 .../hadoop/hbase/util/IdReadWriteLock.java      |  91 ++++++++++++++
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   |  19 ++-
 .../hbase/io/hfile/bucket/TestBucketCache.java  |   9 +-
 .../hadoop/hbase/util/TestIdReadWriteLock.java  | 126 +++++++++++++++++++
 5 files changed, 254 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/61e2566c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index da78127..93987a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,7 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.util.ConcurrentIndex;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.IdReadWriteLock;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -177,14 +178,11 @@ public class BucketCache implements BlockCache, HeapSize {
   private volatile long ioErrorStartTime = -1;
 
   /**
-   * A "sparse lock" implementation allowing to lock on a particular block
-   * identified by offset. The purpose of this is to avoid freeing the block
-   * which is being read.
-   *
-   * TODO:We could extend the IdLock to IdReadWriteLock for better.
+   * A ReentrantReadWriteLock to lock on a particular block identified by offset.
+   * The purpose of this is to avoid freeing the block which is being read.
    */
   @VisibleForTesting
-  final IdLock offsetLock = new IdLock();
+  final IdReadWriteLock offsetLock = new IdReadWriteLock();
 
   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
       new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>()
{
@@ -409,9 +407,9 @@ public class BucketCache implements BlockCache, HeapSize {
     BucketEntry bucketEntry = backingMap.get(key);
     if (bucketEntry != null) {
       long start = System.nanoTime();
-      IdLock.Entry lockEntry = null;
+      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
       try {
-        lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+        lock.readLock().lock();
         // We can not read here even if backingMap does contain the given key because its
offset
         // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
         // existence here.
@@ -440,9 +438,7 @@ public class BucketCache implements BlockCache, HeapSize {
         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
         checkIOErrorIsTolerated();
       } finally {
-        if (lockEntry != null) {
-          offsetLock.releaseLockEntry(lockEntry);
-        }
+        lock.readLock().unlock();
       }
     }
     if (!repeat && updateCacheMetrics) {
@@ -480,21 +476,16 @@ public class BucketCache implements BlockCache, HeapSize {
         return false;
       }
     }
-    IdLock.Entry lockEntry = null;
+    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
     try {
-      lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+      lock.writeLock().lock();
       if (backingMap.remove(cacheKey, bucketEntry)) {
         blockEvicted(cacheKey, bucketEntry, removedBlock == null);
       } else {
         return false;
       }
-    } catch (IOException ie) {
-      LOG.warn("Failed evicting block " + cacheKey);
-      return false;
     } finally {
-      if (lockEntry != null) {
-        offsetLock.releaseLockEntry(lockEntry);
-      }
+      lock.writeLock().unlock();
     }
     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
     return true;
@@ -842,18 +833,14 @@ public class BucketCache implements BlockCache, HeapSize {
           heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
         } else if (bucketEntries[i] != null){
           // Block should have already been evicted. Remove it and free space.
-          IdLock.Entry lockEntry = null;
+          ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
           try {
-            lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
+            lock.writeLock().lock();
             if (backingMap.remove(key, bucketEntries[i])) {
               blockEvicted(key, bucketEntries[i], false);
             }
-          } catch (IOException e) {
-            LOG.warn("failed to free space for " + key, e);
           } finally {
-            if (lockEntry != null) {
-              offsetLock.releaseLockEntry(lockEntry);
-            }
+            lock.writeLock().unlock();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61e2566c/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
new file mode 100644
index 0000000..7dc6fbf
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock.
The
+ * intended usage for read lock is as follows:
+ *
+ * <pre>
+ * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
+ * try {
+ *   lock.readLock().lock();
+ *   // User code.
+ * } finally {
+ *   lock.readLock().unlock();
+ * }
+ * </pre>
+ *
+ * For write lock, use lock.writeLock()
+ */
+@InterfaceAudience.Private
+public class IdReadWriteLock {
+  // The number of lock we want to easily support. It's not a maximum.
+  private static final int NB_CONCURRENT_LOCKS = 1000;
+  // The pool to get entry from, entries are mapped by weak reference to make it able to
be
+  // garbage-collected asap
+  private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool =
+      new WeakObjectPool<Long, ReentrantReadWriteLock>(
+          new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+            @Override
+            public ReentrantReadWriteLock createObject(Long id) {
+              return new ReentrantReadWriteLock();
+            }
+          }, NB_CONCURRENT_LOCKS);
+
+  /**
+   * Get the ReentrantReadWriteLock corresponding to the given id
+   * @param id an arbitrary number to identify the lock
+   */
+  public ReentrantReadWriteLock getLock(long id) {
+    lockPool.purge();
+    ReentrantReadWriteLock readWriteLock = lockPool.get(id);
+    return readWriteLock;
+  }
+
+  /** For testing */
+  @VisibleForTesting
+  int purgeAndGetEntryPoolSize() {
+    System.gc();
+    Threads.sleep(200);
+    lockPool.purge();
+    return lockPool.size();
+  }
+
+  @VisibleForTesting
+  public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
+    for (ReentrantReadWriteLock readWriteLock;;) {
+      readWriteLock = lockPool.get(id);
+      if (readWriteLock != null) {
+        synchronized (readWriteLock) {
+          if (readWriteLock.getQueueLength() >= numWaiters) {
+            return;
+          }
+        }
+      }
+      Thread.sleep(50);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/61e2566c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index b0a2ba2..5ca5f6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -185,7 +185,11 @@ public class CacheTestUtils {
         public void doAnAction() throws Exception {
           ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
               .getBlock(key, false, false, true);
-          assertArrayEquals(buf, returned.buf);
+          if (returned != null) {
+            assertArrayEquals(buf, returned.buf);
+          } else {
+            Thread.sleep(10);
+          }
           totalQueries.incrementAndGet();
         }
       };
@@ -194,6 +198,19 @@ public class CacheTestUtils {
       ctx.addThread(t);
     }
 
+    // add a thread to periodically evict and re-cache the block
+    final long blockEvictPeriod = 50;
+    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+      @Override
+      public void doAnAction() throws Exception {
+        toBeTested.evictBlock(key);
+        toBeTested.cacheBlock(key, bac);
+        Thread.sleep(blockEvictPeriod);
+      }
+    };
+    t.setDaemon(true);
+    ctx.addThread(t);
+
     ctx.startThreads();
     while (totalQueries.get() < numQueries && ctx.shouldRun()) {
       Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61e2566c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index f004868..e10689b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
@@ -34,7 +35,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.IdLock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -173,7 +173,7 @@ public class TestBucketCache {
 
   @Test
   public void testCacheMultiThreadedSingleKey() throws Exception {
-    CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
+    CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES);
   }
 
   @Test
@@ -198,7 +198,8 @@ public class TestBucketCache {
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
         new byte[10]));
     long lockId = cache.backingMap.get(cacheKey).offset();
-    IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId);
+    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
+    lock.writeLock().lock();
     Thread evictThread = new Thread("evict-block") {
 
       @Override
@@ -212,7 +213,7 @@ public class TestBucketCache {
     cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
         new byte[10]));
-    cache.offsetLock.releaseLockEntry(lockEntry);
+    lock.writeLock().unlock();
     evictThread.join();
     assertEquals(1L, cache.getBlockCount());
     assertTrue(cache.getCurrentSize() > 0L);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61e2566c/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
new file mode 100644
index 0000000..bea311c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class})
+// Medium as it creates 100 threads; seems better to run it isolated
+public class TestIdReadWriteLock {
+
+  private static final Log LOG = LogFactory.getLog(TestIdReadWriteLock.class);
+
+  private static final int NUM_IDS = 16;
+  private static final int NUM_THREADS = 128;
+  private static final int NUM_SECONDS = 15;
+
+  private IdReadWriteLock idLock = new IdReadWriteLock();
+
+  private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
+
+  private class IdLockTestThread implements Callable<Boolean> {
+
+    private String clientId;
+
+    public IdLockTestThread(String clientId) {
+      this.clientId = clientId;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      Thread.currentThread().setName(clientId);
+      Random rand = new Random();
+      long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
+      while (System.currentTimeMillis() < endTime) {
+        long id = rand.nextInt(NUM_IDS);
+        boolean readLock = rand.nextBoolean();
+
+        ReentrantReadWriteLock readWriteLock = idLock.getLock(id);
+        Lock lock = readLock ? readWriteLock.readLock() : readWriteLock.writeLock();
+        try {
+          lock.lock();
+          int sleepMs = 1 + rand.nextInt(4);
+          String owner = idOwner.get(id);
+          if (owner != null && LOG.isDebugEnabled()) {
+            LOG.debug((readLock ? "Read" : "Write") + "lock of Id " + id + " already taken
by "
+                + owner + ", we are " + clientId);
+          }
+
+          idOwner.put(id, clientId);
+          Thread.sleep(sleepMs);
+          idOwner.remove(id);
+
+        } finally {
+          lock.unlock();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Release " + (readLock ? "Read" : "Write") + " lock of Id" + id + ",
we are "
+                + clientId);
+          }
+        }
+      }
+      return true;
+    }
+
+  }
+
+  @Test(timeout = 60000)
+  public void testMultipleClients() throws Exception {
+    ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
+    try {
+      ExecutorCompletionService<Boolean> ecs =
+          new ExecutorCompletionService<Boolean>(exec);
+      for (int i = 0; i < NUM_THREADS; ++i)
+        ecs.submit(new IdLockTestThread("client_" + i));
+      for (int i = 0; i < NUM_THREADS; ++i) {
+        Future<Boolean> result = ecs.take();
+        assertTrue(result.get());
+      }
+      // make sure the entry pool will be cleared after GC and purge call
+      int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
+      LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
+      assertEquals(0, entryPoolSize);
+    } finally {
+      exec.shutdown();
+      exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
+    }
+  }
+
+
+}
+


Mime
View raw message