activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/6] activemq-artemis git commit: ARTEMIS-1156: FIX: Long Autoboxing occurring on Hot Path
Date Fri, 12 May 2017 14:07:14 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master b7b79e5df -> 5e6687e0e


ARTEMIS-1156: FIX: Long Autoboxing occurring on Hot Path

Building on ARTEMIS-905 JCtools ConcurrentMap replacement  first proposed but currently parked by @franz1981, replace the collections with primitive key concurrent collections to avoid auto boxing.

The goal of this is to reduce/remove autoboxing on the hot path.
We are just adding jctools to the broker (should not be in client dependencies)
Like wise targeting specific use case with specific implementation rather than a blanket replace all.

Using collections from Bookkeeper, reduces outside tlab allocation, on resizing compared to JCTools, which occurs frequently on testing.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c1d55aa8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c1d55aa8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c1d55aa8

Branch: refs/heads/master
Commit: c1d55aa84f2ca9946afc39038c14d729c8bae36a
Parents: b7b79e5
Author: Michael André Pearce <michael.andre.pearce@me.com>
Authored: Fri May 12 07:14:47 2017 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri May 12 10:05:51 2017 -0400

----------------------------------------------------------------------
 .../collections/ConcurrentLongHashMap.java      | 504 +++++++++++++++++++
 .../collections/ConcurrentLongHashSet.java      | 423 ++++++++++++++++
 .../collections/ConcurrentLongHashMapTest.java  | 405 +++++++++++++++
 .../collections/ConcurrentLongHashSetTest.java  | 249 +++++++++
 .../journal/impl/AbstractJournalUpdateTask.java |   9 +-
 .../core/journal/impl/FileWrapperJournal.java   |  13 +-
 .../core/journal/impl/JournalCompactor.java     |  20 +-
 .../artemis/core/journal/impl/JournalImpl.java  |  36 +-
 .../journal/impl/JournalRecordProvider.java     |   4 +-
 .../core/replication/ReplicationEndpoint.java   |   2 +-
 .../JournalCleanupCompactStressTest.java        |  14 +-
 11 files changed, 1628 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java
new file mode 100644
index 0000000..8af0660
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.LongFunction;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Map from long to an Object.
+ *
+ * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
+ * <ol>
+ * <li>No boxing/unboxing from long -> Long
+ * <li>Open hash map with linear probing, no node allocations to store the values
+ * </ol>
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentLongHashMap<V> {
+
+   private static final Object EmptyValue = null;
+   private static final Object DeletedValue = new Object();
+
+   private static final float MapFillFactor = 0.66f;
+
+   private static final int DefaultExpectedItems = 256;
+   private static final int DefaultConcurrencyLevel = 16;
+
+   private final Section<V>[] sections;
+
+   public ConcurrentLongHashMap() {
+      this(DefaultExpectedItems);
+   }
+
+   public ConcurrentLongHashMap(int expectedItems) {
+      this(expectedItems, DefaultConcurrencyLevel);
+   }
+
+   public ConcurrentLongHashMap(int expectedItems, int numSections) {
+      checkArgument(numSections > 0);
+      if (expectedItems < numSections) {
+         expectedItems = numSections;
+      }
+
+      int perSectionExpectedItems = expectedItems / numSections;
+      int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+      this.sections = (Section<V>[]) new Section[numSections];
+
+      for (int i = 0; i < numSections; i++) {
+         sections[i] = new Section<>(perSectionCapacity);
+      }
+   }
+
+   public int size() {
+      int size = 0;
+      for (Section<V> s : sections) {
+         size += s.size;
+      }
+      return size;
+   }
+
+   long getUsedBucketCount() {
+      long usedBucketCount = 0;
+      for (Section<V> s : sections) {
+         usedBucketCount += s.usedBuckets;
+      }
+      return usedBucketCount;
+   }
+
+   public long capacity() {
+      long capacity = 0;
+      for (Section<V> s : sections) {
+         capacity += s.capacity;
+      }
+      return capacity;
+   }
+
+   public boolean isEmpty() {
+      for (Section<V> s : sections) {
+         if (s.size != 0) {
+            return false;
+         }
+      }
+
+      return true;
+   }
+
+   public V get(long key) {
+      long h = hash(key);
+      return getSection(h).get(key, (int) h);
+   }
+
+   public boolean containsKey(long key) {
+      return get(key) != null;
+   }
+
+   public V put(long key, V value) {
+      checkNotNull(value);
+      long h = hash(key);
+      return getSection(h).put(key, value, (int) h, false, null);
+   }
+
+   public V putIfAbsent(long key, V value) {
+      checkNotNull(value);
+      long h = hash(key);
+      return getSection(h).put(key, value, (int) h, true, null);
+   }
+
+   public V computeIfAbsent(long key, LongFunction<V> provider) {
+      checkNotNull(provider);
+      long h = hash(key);
+      return getSection(h).put(key, null, (int) h, true, provider);
+   }
+
+   public V remove(long key) {
+      long h = hash(key);
+      return getSection(h).remove(key, null, (int) h);
+   }
+
+   public boolean remove(long key, Object value) {
+      checkNotNull(value);
+      long h = hash(key);
+      return getSection(h).remove(key, value, (int) h) != null;
+   }
+
+   private Section<V> getSection(long hash) {
+      // Use 32 msb out of long to get the section
+      final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+      return sections[sectionIdx];
+   }
+
+   public void clear() {
+      for (Section<V> s : sections) {
+         s.clear();
+      }
+   }
+
+   public void forEach(EntryProcessor<V> processor) {
+      for (Section<V> s : sections) {
+         s.forEach(processor);
+      }
+   }
+
+   /**
+    * @return a new list of all keys (makes a copy)
+    */
+   public List<Long> keys() {
+      List<Long> keys = Lists.newArrayListWithExpectedSize((int) size());
+      forEach((key, value) -> keys.add(key));
+      return keys;
+   }
+
+   public ConcurrentLongHashSet keysLongHashSet() {
+      ConcurrentLongHashSet concurrentLongHashSet = new ConcurrentLongHashSet(size());
+      forEach((key, value) -> concurrentLongHashSet.add(key));
+      return concurrentLongHashSet;
+   }
+
+   public List<V> values() {
+      List<V> values = Lists.newArrayListWithExpectedSize((int) size());
+      forEach((key, value) -> values.add(value));
+      return values;
+   }
+
+   public interface EntryProcessor<V> {
+      void accept(long key, V value);
+   }
+
+   // A section is a portion of the hash map that is covered by a single
+   @SuppressWarnings("serial")
+   private static final class Section<V> extends StampedLock {
+      private long[] keys;
+      private V[] values;
+
+      private int capacity;
+      private volatile int size;
+      private int usedBuckets;
+      private int resizeThreshold;
+
+      Section(int capacity) {
+         this.capacity = alignToPowerOfTwo(capacity);
+         this.keys = new long[this.capacity];
+         this.values = (V[]) new Object[this.capacity];
+         this.size = 0;
+         this.usedBuckets = 0;
+         this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+      }
+
+      @SuppressWarnings("NonAtomicVolatileUpdate")
+      V get(long key, int keyHash) {
+         int bucket = keyHash;
+
+         long stamp = tryOptimisticRead();
+         boolean acquiredLock = false;
+
+         try {
+            while (true) {
+               int capacity = this.capacity;
+               bucket = signSafeMod(bucket, capacity);
+
+               // First try optimistic locking
+               long storedKey = keys[bucket];
+               V storedValue = values[bucket];
+
+               if (!acquiredLock && validate(stamp)) {
+                  // The values we have read are consistent
+                  if (storedKey == key) {
+                     return storedValue != DeletedValue ? storedValue : null;
+                  } else if (storedValue == EmptyValue) {
+                     // Not found
+                     return null;
+                  }
+               } else {
+                  // Fallback to acquiring read lock
+                  if (!acquiredLock) {
+                     stamp = readLock();
+                     acquiredLock = true;
+                     storedKey = keys[bucket];
+                     storedValue = values[bucket];
+                  }
+
+                  if (capacity != this.capacity) {
+                     // There has been a rehashing. We need to restart the search
+                     bucket = keyHash;
+                     continue;
+                  }
+
+                  if (storedKey == key) {
+                     return storedValue != DeletedValue ? storedValue : null;
+                  } else if (storedValue == EmptyValue) {
+                     // Not found
+                     return null;
+                  }
+               }
+
+               ++bucket;
+            }
+         } finally {
+            if (acquiredLock) {
+               unlockRead(stamp);
+            }
+         }
+      }
+
+      @SuppressWarnings("NonAtomicVolatileUpdate")
+      V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valueProvider) {
+         int bucket = keyHash;
+
+         long stamp = writeLock();
+         int capacity = this.capacity;
+
+         // Remember where we find the first available spot
+         int firstDeletedKey = -1;
+
+         try {
+            while (true) {
+               bucket = signSafeMod(bucket, capacity);
+
+               long storedKey = keys[bucket];
+               V storedValue = values[bucket];
+
+               if (storedKey == key) {
+                  if (storedValue == EmptyValue) {
+                     values[bucket] = value != null ? value : valueProvider.apply(key);
+                     ++size;
+                     ++usedBuckets;
+                     return valueProvider != null ? values[bucket] : null;
+                  } else if (storedValue == DeletedValue) {
+                     values[bucket] = value != null ? value : valueProvider.apply(key);
+                     ++size;
+                     return valueProvider != null ? values[bucket] : null;
+                  } else if (!onlyIfAbsent) {
+                     // Over written an old value for same key
+                     values[bucket] = value;
+                     return storedValue;
+                  } else {
+                     return storedValue;
+                  }
+               } else if (storedValue == EmptyValue) {
+                  // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                  // key, we should write at that position
+                  if (firstDeletedKey != -1) {
+                     bucket = firstDeletedKey;
+                  } else {
+                     ++usedBuckets;
+                  }
+
+                  keys[bucket] = key;
+                  values[bucket] = value != null ? value : valueProvider.apply(key);
+                  ++size;
+                  return valueProvider != null ? values[bucket] : null;
+               } else if (storedValue == DeletedValue) {
+                  // The bucket contained a different deleted key
+                  if (firstDeletedKey == -1) {
+                     firstDeletedKey = bucket;
+                  }
+               }
+
+               ++bucket;
+            }
+         } finally {
+            if (usedBuckets > resizeThreshold) {
+               try {
+                  rehash();
+               } finally {
+                  unlockWrite(stamp);
+               }
+            } else {
+               unlockWrite(stamp);
+            }
+         }
+      }
+
+      @SuppressWarnings("NonAtomicVolatileUpdate")
+      private V remove(long key, Object value, int keyHash) {
+         int bucket = keyHash;
+         long stamp = writeLock();
+
+         try {
+            while (true) {
+               int capacity = this.capacity;
+               bucket = signSafeMod(bucket, capacity);
+
+               long storedKey = keys[bucket];
+               V storedValue = values[bucket];
+               if (storedKey == key) {
+                  if (value == null || value.equals(storedValue)) {
+                     if (storedValue == EmptyValue || storedValue == DeletedValue) {
+                        return null;
+                     }
+
+                     --size;
+                     V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
+                     if (nextValueInArray == EmptyValue) {
+                        values[bucket] = (V) EmptyValue;
+                        --usedBuckets;
+                     } else {
+                        values[bucket] = (V) DeletedValue;
+                     }
+
+                     return storedValue;
+                  } else {
+                     return null;
+                  }
+               } else if (storedValue == EmptyValue) {
+                  // Key wasn't found
+                  return null;
+               }
+
+               ++bucket;
+            }
+
+         } finally {
+            unlockWrite(stamp);
+         }
+      }
+
+      void clear() {
+         long stamp = writeLock();
+
+         try {
+            Arrays.fill(keys, 0);
+            Arrays.fill(values, EmptyValue);
+            this.size = 0;
+            this.usedBuckets = 0;
+         } finally {
+            unlockWrite(stamp);
+         }
+      }
+
+      public void forEach(EntryProcessor<V> processor) {
+         long stamp = tryOptimisticRead();
+
+         int capacity = this.capacity;
+         long[] keys = this.keys;
+         V[] values = this.values;
+
+         boolean acquiredReadLock = false;
+
+         try {
+
+            // Validate no rehashing
+            if (!validate(stamp)) {
+               // Fallback to read lock
+               stamp = readLock();
+               acquiredReadLock = true;
+
+               capacity = this.capacity;
+               keys = this.keys;
+               values = this.values;
+            }
+
+            // Go through all the buckets for this section
+            for (int bucket = 0; bucket < capacity; bucket++) {
+               long storedKey = keys[bucket];
+               V storedValue = values[bucket];
+
+               if (!acquiredReadLock && !validate(stamp)) {
+                  // Fallback to acquiring read lock
+                  stamp = readLock();
+                  acquiredReadLock = true;
+
+                  storedKey = keys[bucket];
+                  storedValue = values[bucket];
+               }
+
+               if (storedValue != DeletedValue && storedValue != EmptyValue) {
+                  processor.accept(storedKey, storedValue);
+               }
+            }
+         } finally {
+            if (acquiredReadLock) {
+               unlockRead(stamp);
+            }
+         }
+      }
+
+      private void rehash() {
+         // Expand the hashmap
+         int newCapacity = capacity * 2;
+         long[] newKeys = new long[newCapacity];
+         V[] newValues = (V[]) new Object[newCapacity];
+
+         // Re-hash table
+         for (int i = 0; i < keys.length; i++) {
+            long storedKey = keys[i];
+            V storedValue = values[i];
+            if (storedValue != EmptyValue && storedValue != DeletedValue) {
+               insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue);
+            }
+         }
+
+         capacity = newCapacity;
+         keys = newKeys;
+         values = newValues;
+         usedBuckets = size;
+         resizeThreshold = (int) (capacity * MapFillFactor);
+      }
+
+      private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
+         int bucket = (int) hash(key);
+
+         while (true) {
+            bucket = signSafeMod(bucket, keys.length);
+
+            V storedValue = values[bucket];
+
+            if (storedValue == EmptyValue) {
+               // The bucket is empty, so we can use it
+               keys[bucket] = key;
+               values[bucket] = value;
+               return;
+            }
+
+            ++bucket;
+         }
+      }
+   }
+
+   private static final long HashMixer = 0xc6a4a7935bd1e995L;
+   private static final int R = 47;
+
+   static long hash(long key) {
+      long hash = key * HashMixer;
+      hash ^= hash >>> R;
+      hash *= HashMixer;
+      return hash;
+   }
+
+   static int signSafeMod(long n, int Max) {
+      return (int) n & (Max - 1);
+   }
+
+   static int alignToPowerOfTwo(int n) {
+      return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java
new file mode 100644
index 0000000..17d94b7
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java
@@ -0,0 +1,423 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Concurrent hash set for primitive longs
+ *
+ * Provides similar methods as a ConcurrentSet&lt;Long&gt; but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values.
+ * <p>
+ * Items <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongHashSet {
+
+   private static final long EmptyItem = -1L;
+   private static final long DeletedItem = -2L;
+
+   private static final float SetFillFactor = 0.66f;
+
+   private static final int DefaultExpectedItems = 256;
+   private static final int DefaultConcurrencyLevel = 16;
+
+   private final Section[] sections;
+
+   public interface ConsumerLong {
+      void accept(long item);
+   }
+
+   public ConcurrentLongHashSet() {
+      this(DefaultExpectedItems);
+   }
+
+   public ConcurrentLongHashSet(int expectedItems) {
+      this(expectedItems, DefaultConcurrencyLevel);
+   }
+
+   public ConcurrentLongHashSet(int expectedItems, final int numSections) {
+      checkArgument(numSections > 0);
+      if (expectedItems < numSections) {
+         expectedItems = numSections;
+      }
+
+      int perSectionExpectedItems = expectedItems / numSections;
+      int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor);
+      this.sections = new Section[numSections];
+
+      for (int i = 0; i < numSections; i++) {
+         sections[i] = new Section(perSectionCapacity);
+      }
+   }
+
+   public int size() {
+      int size = 0;
+      for (Section s : sections) {
+         size += s.size;
+      }
+      return size;
+   }
+
+   public long capacity() {
+      long capacity = 0;
+      for (Section s : sections) {
+         capacity += s.capacity;
+      }
+      return capacity;
+   }
+
+   public boolean isEmpty() {
+      for (Section s : sections) {
+         if (s.size != 0) {
+            return false;
+         }
+      }
+
+      return true;
+   }
+
+   long getUsedBucketCount() {
+      long usedBucketCount = 0;
+      for (Section s : sections) {
+         usedBucketCount += s.usedBuckets;
+      }
+      return usedBucketCount;
+   }
+
+   public boolean contains(long item) {
+      checkBiggerEqualZero(item);
+      long h = hash(item);
+      return getSection(h).contains(item, (int) h);
+   }
+
+   public boolean add(long item) {
+      checkBiggerEqualZero(item);
+      long h = hash(item);
+      return getSection(h).add(item, (int) h);
+   }
+
+   /**
+    * Remove an existing entry if found
+    *
+    * @param item
+    * @return true if removed or false if item was not present
+    */
+   public boolean remove(long item) {
+      checkBiggerEqualZero(item);
+      long h = hash(item);
+      return getSection(h).remove(item, (int) h);
+   }
+
+   private Section getSection(long hash) {
+      // Use 32 msb out of long to get the section
+      final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+      return sections[sectionIdx];
+   }
+
+   public void clear() {
+      for (Section s : sections) {
+         s.clear();
+      }
+   }
+
+   public void forEach(ConsumerLong processor) {
+      for (Section s : sections) {
+         s.forEach(processor);
+      }
+   }
+
+   /**
+    * @return a new list of all keys (makes a copy)
+    */
+   public Set<Long> items() {
+      Set<Long> items = new HashSet<>();
+      forEach(items::add);
+      return items;
+   }
+
+   // A section is a portion of the hash map that is covered by a single
+   @SuppressWarnings("serial")
+   private static final class Section extends StampedLock {
+      // Keys and values are stored interleaved in the table array
+      private long[] table;
+
+      private int capacity;
+      private volatile int size;
+      private int usedBuckets;
+      private int resizeThreshold;
+
+      Section(int capacity) {
+         this.capacity = alignToPowerOfTwo(capacity);
+         this.table = new long[this.capacity];
+         this.size = 0;
+         this.usedBuckets = 0;
+         this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+         Arrays.fill(table, EmptyItem);
+      }
+
+      boolean contains(long item, int hash) {
+         long stamp = tryOptimisticRead();
+         boolean acquiredLock = false;
+         int bucket = signSafeMod(hash, capacity);
+
+         try {
+            while (true) {
+               // First try optimistic locking
+               long storedItem = table[bucket];
+
+               if (!acquiredLock && validate(stamp)) {
+                  // The values we have read are consistent
+                  if (item == storedItem) {
+                     return true;
+                  } else if (storedItem == EmptyItem) {
+                     // Not found
+                     return false;
+                  }
+               } else {
+                  // Fallback to acquiring read lock
+                  if (!acquiredLock) {
+                     stamp = readLock();
+                     acquiredLock = true;
+
+                     bucket = signSafeMod(hash, capacity);
+                     storedItem = table[bucket];
+                  }
+
+                  if (item == storedItem) {
+                     return true;
+                  } else if (storedItem == EmptyItem) {
+                     // Not found
+                     return false;
+                  }
+               }
+
+               bucket = (bucket + 1) & (table.length - 1);
+            }
+         } finally {
+            if (acquiredLock) {
+               unlockRead(stamp);
+            }
+         }
+      }
+
+      @SuppressWarnings("NonAtomicVolatileUpdate")
+      boolean add(long item, long hash) {
+         long stamp = writeLock();
+         int bucket = signSafeMod(hash, capacity);
+
+         // Remember where we find the first available spot
+         int firstDeletedItem = -1;
+
+         try {
+            while (true) {
+               long storedItem = table[bucket];
+
+               if (item == storedItem) {
+                  // Item was already in set
+                  return false;
+               } else if (storedItem == EmptyItem) {
+                  // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                  // key, we should write at that position
+                  if (firstDeletedItem != -1) {
+                     bucket = firstDeletedItem;
+                  } else {
+                     ++usedBuckets;
+                  }
+
+                  table[bucket] = item;
+                  ++size;
+                  return true;
+               } else if (storedItem == DeletedItem) {
+                  // The bucket contained a different deleted key
+                  if (firstDeletedItem == -1) {
+                     firstDeletedItem = bucket;
+                  }
+               }
+
+               bucket = (bucket + 1) & (table.length - 1);
+            }
+         } finally {
+            if (usedBuckets > resizeThreshold) {
+               try {
+                  rehash();
+               } finally {
+                  unlockWrite(stamp);
+               }
+            } else {
+               unlockWrite(stamp);
+            }
+         }
+      }
+
+      @SuppressWarnings("NonAtomicVolatileUpdate")
+      private boolean remove(long item, int hash) {
+         long stamp = writeLock();
+         int bucket = signSafeMod(hash, capacity);
+
+         try {
+            while (true) {
+               long storedItem = table[bucket];
+               if (item == storedItem) {
+                  --size;
+
+                  cleanBucket(bucket);
+                  return true;
+
+               } else if (storedItem == EmptyItem) {
+                  // Key wasn't found
+                  return false;
+               }
+
+               bucket = (bucket + 1) & (table.length - 1);
+            }
+         } finally {
+            unlockWrite(stamp);
+         }
+      }
+
+      private void cleanBucket(int bucket) {
+         int nextInArray = (bucket + 1) & (table.length - 1);
+         if (table[nextInArray] == EmptyItem) {
+            table[bucket] = EmptyItem;
+            --usedBuckets;
+         } else {
+            table[bucket] = DeletedItem;
+         }
+      }
+
+      void clear() {
+         long stamp = writeLock();
+
+         try {
+            Arrays.fill(table, EmptyItem);
+            this.size = 0;
+            this.usedBuckets = 0;
+         } finally {
+            unlockWrite(stamp);
+         }
+      }
+
+      public void forEach(ConsumerLong processor) {
+         long stamp = tryOptimisticRead();
+
+         long[] table = this.table;
+         boolean acquiredReadLock = false;
+
+         try {
+
+            // Validate no rehashing
+            if (!validate(stamp)) {
+               // Fallback to read lock
+               stamp = readLock();
+               acquiredReadLock = true;
+               table = this.table;
+            }
+
+            // Go through all the buckets for this section
+            for (int bucket = 0; bucket < table.length; bucket++) {
+               long storedItem = table[bucket];
+
+               if (!acquiredReadLock && !validate(stamp)) {
+                  // Fallback to acquiring read lock
+                  stamp = readLock();
+                  acquiredReadLock = true;
+
+                  storedItem = table[bucket];
+               }
+
+               if (storedItem != DeletedItem && storedItem != EmptyItem) {
+                  processor.accept(storedItem);
+               }
+            }
+         } finally {
+            if (acquiredReadLock) {
+               unlockRead(stamp);
+            }
+         }
+      }
+
+      private void rehash() {
+         // Expand the hashmap
+         int newCapacity = capacity * 2;
+         long[] newTable = new long[newCapacity];
+         Arrays.fill(newTable, EmptyItem);
+
+         // Re-hash table
+         for (int i = 0; i < table.length; i++) {
+            long storedItem = table[i];
+            if (storedItem != EmptyItem && storedItem != DeletedItem) {
+               insertKeyValueNoLock(newTable, newCapacity, storedItem);
+            }
+         }
+
+         capacity = newCapacity;
+         table = newTable;
+         usedBuckets = size;
+         resizeThreshold = (int) (capacity * SetFillFactor);
+      }
+
+      private static void insertKeyValueNoLock(long[] table, int capacity, long item) {
+         int bucket = signSafeMod(hash(item), capacity);
+
+         while (true) {
+            long storedKey = table[bucket];
+
+            if (storedKey == EmptyItem) {
+               // The bucket is empty, so we can use it
+               table[bucket] = item;
+               return;
+            }
+
+            bucket = (bucket + 1) & (table.length - 1);
+         }
+      }
+   }
+
+   private static final long HashMixer = 0xc6a4a7935bd1e995L;
+   private static final int R = 47;
+
+   static long hash(long key) {
+      long hash = key * HashMixer;
+      hash ^= hash >>> R;
+      hash *= HashMixer;
+      return hash;
+   }
+
+   static int signSafeMod(long n, int Max) {
+      return (int) (n & (Max - 1));
+   }
+
+   static int alignToPowerOfTwo(int n) {
+      return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+   }
+
+   static void checkBiggerEqualZero(long n) {
+      if (n < 0L) {
+         throw new IllegalArgumentException("Keys and values must be >= 0");
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java
new file mode 100644
index 0000000..b08251f
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java
@@ -0,0 +1,405 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongFunction;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+
+public class ConcurrentLongHashMapTest {
+
+   @Test
+   public void simpleInsertions() {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+
+      assertTrue(map.isEmpty());
+      assertNull(map.put(1, "one"));
+      assertFalse(map.isEmpty());
+
+      assertNull(map.put(2, "two"));
+      assertNull(map.put(3, "three"));
+
+      assertEquals(map.size(), 3);
+
+      assertEquals(map.get(1), "one");
+      assertEquals(map.size(), 3);
+
+      assertEquals(map.remove(1), "one");
+      assertEquals(map.size(), 2);
+      assertEquals(map.get(1), null);
+      assertEquals(map.get(5), null);
+      assertEquals(map.size(), 2);
+
+      assertNull(map.put(1, "one"));
+      assertEquals(map.size(), 3);
+      assertEquals(map.put(1, "uno"), "one");
+      assertEquals(map.size(), 3);
+   }
+
+   @Test
+   public void testRemove() {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+      assertTrue(map.isEmpty());
+      assertNull(map.put(1, "one"));
+      assertFalse(map.isEmpty());
+
+      assertFalse(map.remove(0, "zero"));
+      assertFalse(map.remove(1, "uno"));
+
+      assertFalse(map.isEmpty());
+      assertTrue(map.remove(1, "one"));
+      assertTrue(map.isEmpty());
+   }
+
+   @Test
+   public void testNegativeUsedBucketCount() {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+      map.put(0, "zero");
+      assertEquals(1, map.getUsedBucketCount());
+      map.put(0, "zero1");
+      assertEquals(1, map.getUsedBucketCount());
+      map.remove(0);
+      assertEquals(0, map.getUsedBucketCount());
+      map.remove(0);
+      assertEquals(0, map.getUsedBucketCount());
+   }
+
+   @Test
+   public void testRehashing() {
+      int n = 16;
+      ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+      assertEquals(map.capacity(), n);
+      assertEquals(map.size(), 0);
+
+      for (int i = 0; i < n; i++) {
+         map.put(i, i);
+      }
+
+      assertEquals(map.capacity(), 2 * n);
+      assertEquals(map.size(), n);
+   }
+
+   @Test
+   public void testRehashingWithDeletes() {
+      int n = 16;
+      ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+      assertEquals(map.capacity(), n);
+      assertEquals(map.size(), 0);
+
+      for (int i = 0; i < n / 2; i++) {
+         map.put(i, i);
+      }
+
+      for (int i = 0; i < n / 2; i++) {
+         map.remove(i);
+      }
+
+      for (int i = n; i < (2 * n); i++) {
+         map.put(i, i);
+      }
+
+      assertEquals(map.capacity(), 2 * n);
+      assertEquals(map.size(), n);
+   }
+
+   @Test
+   public void concurrentInsertions() throws Throwable {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+      ExecutorService executor = Executors.newCachedThreadPool();
+
+      final int nThreads = 16;
+      final int N = 100_000;
+      String value = "value";
+
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < nThreads; i++) {
+         final int threadIdx = i;
+
+         futures.add(executor.submit(() -> {
+            Random random = new Random();
+
+            for (int j = 0; j < N; j++) {
+               long key = random.nextLong();
+               // Ensure keys are uniques
+               key -= key % (threadIdx + 1);
+
+               map.put(key, value);
+            }
+         }));
+      }
+
+      for (Future<?> future : futures) {
+         future.get();
+      }
+
+      assertEquals(map.size(), N * nThreads);
+
+      executor.shutdown();
+   }
+
+   @Test
+   public void concurrentInsertionsAndReads() throws Throwable {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+      ExecutorService executor = Executors.newCachedThreadPool();
+
+      final int nThreads = 16;
+      final int N = 100_000;
+      String value = "value";
+
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < nThreads; i++) {
+         final int threadIdx = i;
+
+         futures.add(executor.submit(() -> {
+            Random random = new Random();
+
+            for (int j = 0; j < N; j++) {
+               long key = random.nextLong();
+               // Ensure keys are uniques
+               key -= key % (threadIdx + 1);
+
+               map.put(key, value);
+            }
+         }));
+      }
+
+      for (Future<?> future : futures) {
+         future.get();
+      }
+
+      assertEquals(map.size(), N * nThreads);
+
+      executor.shutdown();
+   }
+
+   @Test
+   public void testIteration() {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+      assertEquals(map.keys(), Collections.emptyList());
+      assertEquals(map.values(), Collections.emptyList());
+
+      map.put(0, "zero");
+
+      assertEquals(map.keys(), Lists.newArrayList(0L));
+      assertEquals(map.values(), Lists.newArrayList("zero"));
+
+      map.remove(0);
+
+      assertEquals(map.keys(), Collections.emptyList());
+      assertEquals(map.values(), Collections.emptyList());
+
+      map.put(0, "zero");
+      map.put(1, "one");
+      map.put(2, "two");
+
+      List<Long> keys = map.keys();
+      Collections.sort(keys);
+      assertEquals(keys, Lists.newArrayList(0L, 1L, 2L));
+
+      List<String> values = map.values();
+      Collections.sort(values);
+      assertEquals(values, Lists.newArrayList("one", "two", "zero"));
+
+      map.put(1, "uno");
+
+      keys = map.keys();
+      Collections.sort(keys);
+      assertEquals(keys, Lists.newArrayList(0L, 1L, 2L));
+
+      values = map.values();
+      Collections.sort(values);
+      assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
+
+      map.clear();
+      assertTrue(map.isEmpty());
+   }
+
+   @Test
+   public void testHashConflictWithDeletion() {
+      final int Buckets = 16;
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+
+      // Pick 2 keys that fall into the same bucket
+      long key1 = 1;
+      long key2 = 27;
+
+      int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets);
+      int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets);
+      assertEquals(bucket1, bucket2);
+
+      assertEquals(map.put(key1, "value-1"), null);
+      assertEquals(map.put(key2, "value-2"), null);
+      assertEquals(map.size(), 2);
+
+      assertEquals(map.remove(key1), "value-1");
+      assertEquals(map.size(), 1);
+
+      assertEquals(map.put(key1, "value-1-overwrite"), null);
+      assertEquals(map.size(), 2);
+
+      assertEquals(map.remove(key1), "value-1-overwrite");
+      assertEquals(map.size(), 1);
+
+      assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
+      assertEquals(map.get(key2), "value-2-overwrite");
+
+      assertEquals(map.size(), 1);
+      assertEquals(map.remove(key2), "value-2-overwrite");
+      assertTrue(map.isEmpty());
+   }
+
+   @Test
+   public void testPutIfAbsent() {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+      assertEquals(map.putIfAbsent(1, "one"), null);
+      assertEquals(map.get(1), "one");
+
+      assertEquals(map.putIfAbsent(1, "uno"), "one");
+      assertEquals(map.get(1), "one");
+   }
+
+   @Test
+   public void testComputeIfAbsent() {
+      ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+      AtomicInteger counter = new AtomicInteger();
+      LongFunction<Integer> provider = key -> counter.getAndIncrement();
+
+      assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
+      assertEquals(map.get(0).intValue(), 0);
+
+      assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+      assertEquals(map.get(1).intValue(), 1);
+
+      assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+      assertEquals(map.get(1).intValue(), 1);
+
+      assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
+      assertEquals(map.get(2).intValue(), 2);
+   }
+
+   int Iterations = 1;
+   int ReadIterations = 100;
+   int N = 1_000_000;
+
+   public void benchConcurrentLongHashMap() throws Exception {
+      // public static void main(String args[]) {
+      ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+
+      for (long i = 0; i < Iterations; i++) {
+         for (int j = 0; j < N; j++) {
+            map.put(i, "value");
+         }
+
+         for (long h = 0; h < ReadIterations; h++) {
+            for (int j = 0; j < N; j++) {
+               map.get(i);
+            }
+         }
+
+         for (int j = 0; j < N; j++) {
+            map.remove(i);
+         }
+      }
+   }
+
+   public void benchConcurrentHashMap() throws Exception {
+      ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<>(N, 0.66f, 1);
+
+      for (long i = 0; i < Iterations; i++) {
+         for (int j = 0; j < N; j++) {
+            map.put(i, "value");
+         }
+
+         for (long h = 0; h < ReadIterations; h++) {
+            for (int j = 0; j < N; j++) {
+               map.get(i);
+            }
+         }
+
+         for (int j = 0; j < N; j++) {
+            map.remove(i);
+         }
+      }
+   }
+
+   void benchHashMap() throws Exception {
+      HashMap<Long, String> map = new HashMap<>(N, 0.66f);
+
+      for (long i = 0; i < Iterations; i++) {
+         for (int j = 0; j < N; j++) {
+            map.put(i, "value");
+         }
+
+         for (long h = 0; h < ReadIterations; h++) {
+            for (int j = 0; j < N; j++) {
+               map.get(i);
+            }
+         }
+
+         for (int j = 0; j < N; j++) {
+            map.remove(i);
+         }
+      }
+   }
+
+   public static void main(String[] args) throws Exception {
+      ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest();
+
+      long start = System.nanoTime();
+      // t.benchHashMap();
+      long end = System.nanoTime();
+
+      System.out.println("HM:   " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+      start = System.nanoTime();
+      t.benchConcurrentHashMap();
+      end = System.nanoTime();
+
+      System.out.println("CHM:  " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+      start = System.nanoTime();
+      // t.benchConcurrentLongHashMap();
+      end = System.nanoTime();
+
+      System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java
new file mode 100644
index 0000000..24337f1
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConcurrentLongHashSetTest {
+
+   @Test
+   public void simpleInsertions() {
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
+
+      assertTrue(set.isEmpty());
+      assertTrue(set.add(1));
+      assertFalse(set.isEmpty());
+
+      assertTrue(set.add(2));
+      assertTrue(set.add(3));
+
+      assertEquals(set.size(), 3);
+
+      assertTrue(set.contains(1));
+      assertEquals(set.size(), 3);
+
+      assertTrue(set.remove(1));
+      assertEquals(set.size(), 2);
+      assertFalse(set.contains(1));
+      assertFalse(set.contains(5));
+      assertEquals(set.size(), 2);
+
+      assertTrue(set.add(1));
+      assertEquals(set.size(), 3);
+      assertFalse(set.add(1));
+      assertEquals(set.size(), 3);
+   }
+
+   @Test
+   public void testRemove() {
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+      assertTrue(set.isEmpty());
+      assertTrue(set.add(1));
+      assertFalse(set.isEmpty());
+
+      assertFalse(set.remove(0));
+      assertFalse(set.isEmpty());
+      assertTrue(set.remove(1));
+      assertTrue(set.isEmpty());
+   }
+
+   @Test
+   public void testRehashing() {
+      int n = 16;
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+      assertEquals(set.capacity(), n);
+      assertEquals(set.size(), 0);
+
+      for (int i = 0; i < n; i++) {
+         set.add(i);
+      }
+
+      assertEquals(set.capacity(), 2 * n);
+      assertEquals(set.size(), n);
+   }
+
+   @Test
+   public void testRehashingWithDeletes() {
+      int n = 16;
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+      assertEquals(set.capacity(), n);
+      assertEquals(set.size(), 0);
+
+      for (int i = 0; i < n / 2; i++) {
+         set.add(i);
+      }
+
+      for (int i = 0; i < n / 2; i++) {
+         set.remove(i);
+      }
+
+      for (int i = n; i < (2 * n); i++) {
+         set.add(i);
+      }
+
+      assertEquals(set.capacity(), 2 * n);
+      assertEquals(set.size(), n);
+   }
+
+   @Test
+   public void concurrentInsertions() throws Throwable {
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+      ExecutorService executor = Executors.newCachedThreadPool();
+
+      final int nThreads = 16;
+      final int N = 100_000;
+
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < nThreads; i++) {
+         final int threadIdx = i;
+
+         futures.add(executor.submit(() -> {
+            Random random = new Random();
+
+            for (int j = 0; j < N; j++) {
+               long key = Math.abs(random.nextLong());
+               // Ensure keys are unique
+               key -= key % (threadIdx + 1);
+
+               set.add(key);
+            }
+         }));
+      }
+
+      for (Future<?> future : futures) {
+         future.get();
+      }
+
+      assertEquals(set.size(), N * nThreads);
+
+      executor.shutdown();
+   }
+
+   @Test
+   public void concurrentInsertionsAndReads() throws Throwable {
+      ConcurrentLongHashSet map = new ConcurrentLongHashSet();
+      ExecutorService executor = Executors.newCachedThreadPool();
+
+      final int nThreads = 16;
+      final int N = 100_000;
+
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < nThreads; i++) {
+         final int threadIdx = i;
+
+         futures.add(executor.submit(() -> {
+            Random random = new Random();
+
+            for (int j = 0; j < N; j++) {
+               long key = Math.abs(random.nextLong());
+               // Ensure keys are unique
+               key -= key % (threadIdx + 1);
+
+               map.add(key);
+            }
+         }));
+      }
+
+      for (Future<?> future : futures) {
+         future.get();
+      }
+
+      assertEquals(map.size(), N * nThreads);
+
+      executor.shutdown();
+   }
+
+   @Test
+   public void testIteration() {
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+      assertEquals(set.items(), Collections.emptySet());
+
+      set.add(0L);
+
+      assertEquals(set.items(), Sets.newHashSet(0L));
+
+      set.remove(0L);
+
+      assertEquals(set.items(), Collections.emptySet());
+
+      set.add(0L);
+      set.add(1L);
+      set.add(2L);
+
+      List<Long> values = Lists.newArrayList(set.items());
+      Collections.sort(values);
+      assertEquals(values, Lists.newArrayList(0L, 1L, 2L));
+
+      set.clear();
+      assertTrue(set.isEmpty());
+   }
+
+   @Test
+   public void testHashConflictWithDeletion() {
+      final int Buckets = 16;
+      ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1);
+
+      // Pick 2 keys that fall into the same bucket
+      long key1 = 1;
+      long key2 = 27;
+
+      int bucket1 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key1), Buckets);
+      int bucket2 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key2), Buckets);
+      assertEquals(bucket1, bucket2);
+
+      assertTrue(set.add(key1));
+      assertTrue(set.add(key2));
+      assertEquals(set.size(), 2);
+
+      assertTrue(set.remove(key1));
+      assertEquals(set.size(), 1);
+
+      assertTrue(set.add(key1));
+      assertEquals(set.size(), 2);
+
+      assertTrue(set.remove(key1));
+      assertEquals(set.size(), 1);
+
+      assertFalse(set.add(key2));
+      assertTrue(set.contains(key2));
+
+      assertEquals(set.size(), 1);
+      assertTrue(set.remove(key2));
+      assertTrue(set.isEmpty());
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 943077c..8de3da6 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.journal.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -30,7 +29,7 @@ import org.apache.activemq.artemis.core.journal.EncoderPersister;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
 
 /**
  * Super class for Journal maintenances such as clean up and Compactor
@@ -56,7 +55,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
 
    private ActiveMQBuffer writingChannel;
 
-   private final Set<Long> recordsSnapshot = new ConcurrentHashSet<>();
+   private final ConcurrentLongHashSet recordsSnapshot;
 
    protected final List<JournalFile> newDataFiles = new ArrayList<>();
 
@@ -67,14 +66,14 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
    protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
                                        final JournalImpl journal,
                                        final JournalFilesRepository filesRepository,
-                                       final Set<Long> recordsSnapshot,
+                                       final ConcurrentLongHashSet recordsSnapshot,
                                        final long nextOrderingID) {
       super();
       this.journal = journal;
       this.filesRepository = filesRepository;
       this.fileFactory = fileFactory;
       this.nextOrderingID = nextOrderingID;
-      this.recordsSnapshot.addAll(recordsSnapshot);
+      this.recordsSnapshot = recordsSnapshot;
    }
 
    // Public --------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 8e5ca2c..5ef240a 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.journal.impl;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -43,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 
 /**
  * Journal used at a replicating backup server during the synchronization of data with the 'live'
@@ -54,7 +53,7 @@ public final class FileWrapperJournal extends JournalBase {
 
    private final ReentrantLock lockAppend = new ReentrantLock();
 
-   private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<>();
+   private final ConcurrentLongHashMap<AtomicInteger> transactions = new ConcurrentLongHashMap<>();
    private final JournalImpl journal;
    protected volatile JournalFile currentFile;
 
@@ -181,7 +180,7 @@ public final class FileWrapperJournal extends JournalBase {
                                   IOCompletion callback,
                                   boolean lineUpContext) throws Exception {
       JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
-      AtomicInteger value = transactions.remove(Long.valueOf(txID));
+      AtomicInteger value = transactions.remove(txID);
       if (value != null) {
          commitRecord.setNumberOfRecords(value.get());
       }
@@ -195,7 +194,7 @@ public final class FileWrapperJournal extends JournalBase {
                                    boolean sync,
                                    IOCompletion callback) throws Exception {
       JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
-      AtomicInteger value = transactions.get(Long.valueOf(txID));
+      AtomicInteger value = transactions.get(txID);
       if (value != null) {
          prepareRecord.setNumberOfRecords(value.get());
       }
@@ -204,7 +203,7 @@ public final class FileWrapperJournal extends JournalBase {
 
    private int count(long txID) throws ActiveMQException {
       AtomicInteger defaultValue = new AtomicInteger(1);
-      AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
+      AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
       if (count != null) {
          return count.incrementAndGet();
       }
@@ -219,7 +218,7 @@ public final class FileWrapperJournal extends JournalBase {
    @Override
    public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
       JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
-      AtomicInteger value = transactions.remove(Long.valueOf(txID));
+      AtomicInteger value = transactions.remove(txID);
       if (value != null) {
          rollbackRecord.setNumberOfRecords(value.get());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 8b89c3e..e3e1e7b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -18,12 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -41,6 +37,8 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
 import org.jboss.logging.Logger;
 
 public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider {
@@ -53,11 +51,11 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    private static final short COMPACT_SPLIT_LINE = 2;
 
    // Snapshot of transactions that were pending when the compactor started
-   private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<>();
+   private final ConcurrentLongHashMap<PendingTransaction> pendingTransactions = new ConcurrentLongHashMap<>();
 
-   private final Map<Long, JournalRecord> newRecords = new HashMap<>();
+   private final ConcurrentLongHashMap<JournalRecord> newRecords = new ConcurrentLongHashMap<>();
 
-   private final Map<Long, JournalTransaction> newTransactions = new HashMap<>();
+   private final ConcurrentLongHashMap<JournalTransaction> newTransactions = new ConcurrentLongHashMap<>();
 
    /**
     * Commands that happened during compacting
@@ -120,18 +118,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       return newDataFiles;
    }
 
-   public Map<Long, JournalRecord> getNewRecords() {
+   public ConcurrentLongHashMap<JournalRecord> getNewRecords() {
       return newRecords;
    }
 
-   public Map<Long, JournalTransaction> getNewTransactions() {
+   public ConcurrentLongHashMap<JournalTransaction> getNewTransactions() {
       return newTransactions;
    }
 
    public JournalCompactor(final SequentialFileFactory fileFactory,
                            final JournalImpl journal,
                            final JournalFilesRepository filesRepository,
-                           final Set<Long> recordsSnapshot,
+                           final ConcurrentLongHashSet recordsSnapshot,
                            final long firstFileID) {
       super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
    }
@@ -628,7 +626,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    }
 
    @Override
-   public Map<Long, JournalRecord> getRecords() {
+   public ConcurrentLongHashMap<JournalRecord> getRecords() {
       return newRecords;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 81ae9c0..1758999 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -31,8 +31,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
@@ -74,6 +72,8 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
@@ -168,12 +168,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    private final JournalFilesRepository filesRepository;
 
    // Compacting may replace this structure
-   private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
+   private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>();
 
-   private final Set<Long> pendingRecords = new ConcurrentHashSet<>();
+   private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet();
 
    // Compacting may replace this structure
-   private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
+   private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
 
    // This will be set only while the JournalCompactor is being executed
    private volatile JournalCompactor compactor;
@@ -345,7 +345,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    }
 
    @Override
-   public Map<Long, JournalRecord> getRecords() {
+   public ConcurrentLongHashMap<JournalRecord> getRecords() {
       return records;
    }
 
@@ -1487,12 +1487,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   return;
                }
 
-               compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
+               compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
 
-               for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet()) {
-                  compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
-                  entry.getValue().setCompacting();
-               }
+               transactions.forEach((id, pendingTransaction) -> {
+                  compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray());
+                  pendingTransaction.setCompacting();
+               });
 
                // We will calculate the new records during compacting, what will take the position the records will take
                // after compacting
@@ -1540,9 +1540,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                newDatafiles = localCompactor.getNewDataFiles();
 
                // Restore newRecords created during compacting
-               for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet()) {
-                  records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
-               }
+               localCompactor.getNewRecords().forEach((id, newRecord) -> {
+                  records.put(id, newRecord);
+               });
 
                // Restore compacted dataFiles
                for (int i = newDatafiles.size() - 1; i >= 0; i--) {
@@ -1559,9 +1559,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
                // Replay pending commands (including updates, deletes and commits)
 
-               for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) {
-                  newTransaction.replaceRecordProvider(this);
-               }
+               localCompactor.getNewTransactions().forEach((id, newTransaction) -> newTransaction.replaceRecordProvider(this));
 
                localCompactor.replayPendingCommands();
 
@@ -1569,7 +1567,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                // This has to be done after the replay pending commands, as we need to delete commits
                // that happened during the compacting
 
-               for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) {
+               localCompactor.getNewTransactions().forEach((id, newTransaction) -> {
                   if (logger.isTraceEnabled()) {
                      logger.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
                   }
@@ -1579,7 +1577,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   } else {
                      ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
                   }
-               }
+               });
             } finally {
                journalLock.writeLock().unlock();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
index 6c5107a..c9c92f4 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.artemis.core.journal.impl;
 
-import java.util.Map;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 
 /**
  * This is an interface used only internally.
@@ -29,5 +29,5 @@ public interface JournalRecordProvider {
 
    JournalCompactor getCompactor();
 
-   Map<Long, JournalRecord> getRecords();
+   ConcurrentLongHashMap<JournalRecord> getRecords();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 4bf2726..6f899f3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -371,7 +371,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
     * @throws Exception
     */
    private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
-      Long id = Long.valueOf(msg.getId());
+      long id = msg.getId();
       byte[] data = msg.getData();
       SequentialFile channel1;
       switch (msg.getFileType()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1d55aa8/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
----------------------------------------------------------------------
diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
index c08f1dd..7493949 100644
--- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
+++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.stress.journal;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -238,12 +237,15 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase {
 
       reloadJournal();
 
-      Collection<Long> records = journal.getRecords().keySet();
-
       System.out.println("Deleting everything!");
-      for (Long delInfo : records) {
-         journal.appendDeleteRecord(delInfo, false);
-      }
+
+      journal.getRecords().forEach((id, record) -> {
+         try {
+            journal.appendDeleteRecord(id, false);
+         } catch (Exception e) {
+            new RuntimeException(e);
+         }
+      });
 
       journal.forceMoveNextFile();
 


Mime
View raw message