pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: Fixed race condition during expansion of concurrent open hash maps (#2387)
Date Fri, 17 Aug 2018 15:47:32 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 086cbbb  Fixed race condition during expansion of concurrent open hash maps (#2387)
086cbbb is described below

commit 086cbbba0e8854ecbafc55bff9e93cb9a84ce797
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Fri Aug 17 08:47:30 2018 -0700

    Fixed race condition during expansion of concurrent open hash maps (#2387)
    
    ### Motivation
    
    Porting same fix as https://github.com/apache/bookkeeper/pull/1607 to correct issue reported
on https://github.com/apache/bookkeeper/issues/1606.
    
    There is a race condition in the concurrent open hash maps implementation. The race happens
when the maps gets re-hashed after the expansion and the new arrays are substituting the old
ones.
    
    The race itself is that a thread doing a `get()` on the map is first checking the current
capacity of the map, uses that to get the bucket and then tries to do optimistic read of the
value in that bucket.
    
    This assumes `capacity` update is visible only after the `values` array is already swapped,
but that is not always the case in current code.
    
    ### Changes
    
     * Use `volatile` qualifier for `capacity` and `values` arrays to ensure ordering of memory
read is respected by compiler
     * In rehashing, update `capacity` after `values` where it was not already the case
---
 .../util/collections/ConcurrentLongHashMap.java    |  8 +--
 .../util/collections/ConcurrentLongPairSet.java    | 15 +++---
 .../util/collections/ConcurrentOpenHashMap.java    |  6 +--
 .../util/collections/ConcurrentOpenHashSet.java    |  6 +--
 .../collections/ConcurrentLongHashMapTest.java     | 57 ++++++++++++++++++++--
 5 files changed, 72 insertions(+), 20 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index fcb0c10..60c24c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -30,7 +30,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Map from long to an Object.
- * 
+ *
  * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
  * <ol>
  * <li>No boxing/unboxing from long -> Long
@@ -187,10 +187,10 @@ public class ConcurrentLongHashMap<V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private long[] keys;
-        private V[] values;
+        private volatile long[] keys;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 74d4314..4634b40 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -26,7 +26,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.StampedLock;
-import java.util.function.Predicate;
 
 /**
  * Concurrent hash set where values are composed of pairs of longs.
@@ -163,11 +162,11 @@ public class ConcurrentLongPairSet {
 
     /**
      * Removes all of the elements of this collection that satisfy the given predicate.
-     * 
+     *
      * @param filter
      *            a predicate which returns {@code true} for elements to be removed
      * @return {@code true} if any elements were removed
-     * 
+     *
      * @return number of removed values
      */
     public int removeIf(LongPairPredicate filter) {
@@ -209,9 +208,9 @@ public class ConcurrentLongPairSet {
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private long[] table;
+        private volatile long[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -449,9 +448,11 @@ public class ConcurrentLongPairSet {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * SetFillFactor);
         }
 
@@ -532,7 +533,7 @@ public class ConcurrentLongPairSet {
             }
         }
     }
-    
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 585471c..94f64de 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -31,7 +31,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Concurrent hash map
- * 
+ *
  * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map
with linear probing, no node
  * allocations are required to store the values
  *
@@ -180,9 +180,9 @@ public class ConcurrentOpenHashMap<K, V> {
     @SuppressWarnings("serial")
     private static final class Section<K, V> extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private Object[] table;
+        private volatile Object[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 93ca6e8..2f27913 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -32,7 +32,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Concurrent hash set
- * 
+ *
  * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map
with linear probing, no node
  * allocations are required to store the values
  *
@@ -175,9 +175,9 @@ public class ConcurrentOpenHashSet<V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private V[] values;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index c4215f3..e4cbd47 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -24,12 +24,15 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -37,11 +40,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.LongFunction;
 
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-
 public class ConcurrentLongHashMapTest {
 
     @Test
@@ -235,6 +235,57 @@ public class ConcurrentLongHashMapTest {
     }
 
     @Test
+    public void stressConcurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int writeThreads = 16;
+        final int readThreads = 16;
+        final int n = 1_000_000;
+        String value = "value";
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+                    map.put(key, value);
+                }
+            }));
+        }
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+                    map.get(key);
+                }
+            }));
+        }
+        for (Future<?> future : futures) {
+            future.get();
+        }
+        assertEquals(map.size(), n * writeThreads);
+        executor.shutdown();
+    }
+
+    @Test
     public void testIteration() {
         ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
 


Mime
View raw message