activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] 03/04: ARTEMIS-3016 Refactored duplicate ids cache
Date Wed, 06 Jan 2021 14:05:27 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 2b5d99bbd1697728da071ecafa585b0b6168979d
Author: franz1981 <nigro.fra@gmail.com>
AuthorDate: Wed Dec 2 21:38:11 2020 +0100

    ARTEMIS-3016 Refactored duplicate ids cache
---
 .../artemis/core/postoffice/DuplicateIDCache.java  |   2 +-
 .../artemis/core/postoffice/impl/ByteArray.java    |  50 +++
 .../core/postoffice/impl/DuplicateIDCacheImpl.java | 448 ---------------------
 .../core/postoffice/impl/DuplicateIDCaches.java    |  39 ++
 .../postoffice/impl/InMemoryDuplicateIDCache.java  | 276 +++++++++++++
 .../artemis/core/postoffice/impl/IntegerCache.java |  62 +++
 .../impl/PersistentDuplicateIDCache.java           | 394 ++++++++++++++++++
 .../core/postoffice/impl/PostOfficeImpl.java       |  44 +-
 .../persistence/DuplicateCacheTest.java            |   6 +-
 .../performance/jmh/DuplicateIDCacheBenchmark.java |   6 +-
 .../impl/DuplicateDetectionUnitTest.java           |   9 +-
 .../core/server/impl/fakes/FakePostOffice.java     |   5 +-
 12 files changed, 860 insertions(+), 481 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index b384896..75cc17b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -40,7 +40,7 @@ public interface DuplicateIDCache {
 
    void deleteFromCache(byte[] duplicateID) throws Exception;
 
-   void load(List<Pair<byte[], Long>> theIds) throws Exception;
+   void load(List<Pair<byte[], Long>> ids) throws Exception;
 
    void load(Transaction tx, byte[] duplID);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java
new file mode 100644
index 0000000..8f2c67e
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import org.apache.activemq.artemis.utils.ByteUtil;
+
+final class ByteArray {
+
+   final byte[] bytes;
+
+   private int hash;
+
+   ByteArray(final byte[] bytes) {
+      this.bytes = bytes;
+   }
+
+   @Override
+   public boolean equals(final Object other) {
+      if (other instanceof ByteArray) {
+         ByteArray s = (ByteArray) other;
+
+         return ByteUtil.equals(bytes, s.bytes);
+      } else {
+         return false;
+      }
+   }
+
+   @Override
+   public int hashCode() {
+      if (hash == 0) {
+         hash = ByteUtil.hashCode(bytes);
+      }
+
+      return hash;
+   }
+}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
deleted file mode 100644
index 4dfc765..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.postoffice.impl;
-
-import java.lang.ref.Reference;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
-import org.apache.activemq.artemis.api.core.ObjLongPair;
-import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.jboss.logging.Logger;
-
-import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
-
-/**
- * A DuplicateIDCacheImpl
- *
- * A fixed size rotating cache of last X duplicate ids.
- */
-public class DuplicateIDCacheImpl implements DuplicateIDCache {
-
-   private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
-   // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
-   private static WeakReference<Integer[]> INDEXES = null;
-
-   private static Integer[] boxedInts(int size) {
-      final Reference<Integer[]> indexesRef = INDEXES;
-      final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
-      if (indexes != null && size <= indexes.length) {
-         return indexes;
-      }
-      final int newSize = size + (indexes == null ? 0 : size / 2);
-      final Integer[] newIndexes = new Integer[newSize];
-      if (indexes != null) {
-         System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
-      }
-      INDEXES = new WeakReference<>(newIndexes);
-      return newIndexes;
-   }
-
-   // ByteHolder, position
-   private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
-
-   private final SimpleString address;
-
-   // Note - deliberately typed as ArrayList since we want to ensure fast indexed
-   // based array access
-   private final ArrayList<ObjLongPair<ByteArrayHolder>> ids;
-
-   private final Integer[] cachedBoxedInts;
-
-   private int pos;
-
-   private final int cacheSize;
-
-   private final StorageManager storageManager;
-
-   private final boolean persist;
-
-   public DuplicateIDCacheImpl(final SimpleString address,
-                               final int size,
-                               final StorageManager storageManager,
-                               final boolean persist) {
-      this.address = address;
-
-      cacheSize = size;
-
-      ids = new ArrayList<>(size);
-
-      cachedBoxedInts = boxedInts(size);
-
-      this.storageManager = storageManager;
-
-      this.persist = persist;
-   }
-
-   // best effort caching mechanism
-   private Integer boxed(int index) {
-      Integer boxedInt = this.cachedBoxedInts[index];
-      if (boxedInt == null) {
-         boxedInt = index;
-         cachedBoxedInts[index] = boxedInt;
-      }
-      assert boxedInt != null;
-      return boxedInt;
-   }
-
-   @Override
-   public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
-      long txID = -1;
-
-      // If we have more IDs than cache size, we shrink the first ones
-      int deleteCount = theIds.size() - cacheSize;
-      if (deleteCount < 0) {
-         deleteCount = 0;
-      }
-
-      for (Pair<byte[], Long> id : theIds) {
-         if (deleteCount > 0) {
-            if (txID == -1) {
-               txID = storageManager.generateID();
-            }
-            if (logger.isTraceEnabled()) {
-               logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
-            }
-
-            storageManager.deleteDuplicateIDTransactional(txID, id.getB());
-            deleteCount--;
-         } else {
-            ByteArrayHolder bah = new ByteArrayHolder(id.getA());
-
-            ObjLongPair<ByteArrayHolder> pair = new ObjLongPair<>(bah, id.getB());
-
-            cache.put(bah, boxed(ids.size()));
-
-            ids.add(pair);
-            if (logger.isTraceEnabled()) {
-               logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
-            }
-         }
-
-      }
-
-      if (txID != -1) {
-         storageManager.commit(txID);
-      }
-
-      pos = ids.size();
-
-      if (pos == cacheSize) {
-         pos = 0;
-      }
-
-   }
-
-   @Override
-   public void deleteFromCache(byte[] duplicateID) throws Exception {
-      if (logger.isTraceEnabled()) {
-         logger.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0));
-      }
-
-      ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
-
-      Integer posUsed = cache.remove(bah);
-
-      if (posUsed != null) {
-         ObjLongPair<ByteArrayHolder> id;
-
-         synchronized (this) {
-            id = ids.get(posUsed.intValue());
-
-            if (id.getA().equals(bah)) {
-               id.setA(null);
-               storageManager.deleteDuplicateID(id.getB());
-               if (logger.isTraceEnabled()) {
-                  logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
-               }
-               id.setB(NIL);
-            }
-         }
-      }
-
-   }
-
-   private String describeID(byte[] duplicateID, long id) {
-      if (id != 0) {
-         return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
-      } else {
-         return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
-      }
-   }
-
-   @Override
-   public boolean contains(final byte[] duplID) {
-      return contains(new ByteArrayHolder(duplID));
-   }
-
-   private boolean contains(final ByteArrayHolder duplID) {
-      boolean contains = cache.containsKey(duplID);
-
-      if (logger.isTraceEnabled()) {
-         if (contains) {
-            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::contains found a duplicate " + describeID(duplID.bytes, 0));
-         }
-      }
-      return contains;
-   }
-
-   @Override
-   public void addToCache(final byte[] duplID) throws Exception {
-      addToCache(duplID, null, false);
-   }
-
-   @Override
-   public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
-      addToCache(duplID, tx, false);
-   }
-
-   @Override
-   public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
-      final ByteArrayHolder holder = new ByteArrayHolder(duplID);
-      if (contains(holder)) {
-         if (tx != null) {
-            tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
-         }
-         return false;
-      }
-      addToCache(holder, tx, true);
-      return true;
-   }
-
-   @Override
-   public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
-      addToCache(new ByteArrayHolder(duplID), tx, instantAdd);
-   }
-
-   private synchronized void addToCache(final ByteArrayHolder holder,
-                                        final Transaction tx,
-                                        boolean instantAdd) throws Exception {
-      long recordID = -1;
-      if (tx == null) {
-         if (persist) {
-            recordID = storageManager.generateID();
-            storageManager.storeDuplicateID(address, holder.bytes, recordID);
-         }
-
-         addToCacheInMemory(holder, recordID);
-      } else {
-         if (persist) {
-            recordID = storageManager.generateID();
-            storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
-
-            tx.setContainsPersistent();
-         }
-
-         if (logger.isTraceEnabled()) {
-            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(holder.bytes, recordID) + ", tx=" + tx);
-         }
-
-         if (instantAdd) {
-            tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
-         } else {
-            // For a tx, it's important that the entry is not added to the cache until commit
-            // since if the client fails then resends them tx we don't want it to get rejected
-            tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
-         }
-      }
-   }
-
-   @Override
-   public void load(final Transaction tx, final byte[] duplID) {
-      tx.addOperation(new AddDuplicateIDOperation(new ByteArrayHolder(duplID), tx.getID(), true));
-   }
-
-   private synchronized void addToCacheInMemory(final ByteArrayHolder holder, final long recordID) {
-      if (logger.isTraceEnabled()) {
-         logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(holder.bytes, recordID));
-      }
-
-      cache.put(holder, boxed(pos));
-
-      ObjLongPair<ByteArrayHolder> id;
-
-      if (pos < ids.size()) {
-         // Need fast array style access here -hence ArrayList typing
-         id = ids.get(pos);
-
-         // The id here might be null if it was explicit deleted
-         if (id.getA() != null) {
-            if (logger.isTraceEnabled()) {
-               logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
-            }
-
-            cache.remove(id.getA());
-
-            // Record already exists - we delete the old one and add the new one
-            // Note we can't use update since journal update doesn't let older records get
-            // reclaimed
-
-            if (id.getB() != NIL) {
-               try {
-                  storageManager.deleteDuplicateID(id.getB());
-               } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
-               }
-            }
-         }
-
-         id.setA(holder);
-
-         // The recordID could be negative if the duplicateCache is configured to not persist,
-         // -1 would mean null on this case
-         id.setB(recordID >= 0 ? recordID : NIL);
-
-         if (logger.isTraceEnabled()) {
-            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
-         }
-
-      } else {
-         id = new ObjLongPair<>(holder, recordID >= 0 ? recordID : NIL);
-
-         if (logger.isTraceEnabled()) {
-            logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
-         }
-
-         ids.add(id);
-
-      }
-
-      if (pos++ == cacheSize - 1) {
-         pos = 0;
-      }
-   }
-
-   @Override
-   public void clear() throws Exception {
-      logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
-      synchronized (this) {
-         final int idsSize = ids.size();
-         if (idsSize > 0 && persist) {
-            long tx = storageManager.generateID();
-            for (int i = 0; i < idsSize; i++) {
-               final ObjLongPair<ByteArrayHolder> id = ids.get(i);
-               if (id != null && id.getB() != NIL) {
-                  storageManager.deleteDuplicateIDTransactional(tx, id.getB());
-               }
-            }
-            storageManager.commit(tx);
-         }
-
-         ids.clear();
-         cache.clear();
-         pos = 0;
-      }
-   }
-
-   @Override
-   public List<Pair<byte[], Long>> getMap() {
-      final int idsSize = ids.size();
-      List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
-      for (int i = 0; i < idsSize; i++) {
-         final ObjLongPair<ByteArrayHolder> id = ids.get(i);
-         copy.add(new Pair<>(id.getA().bytes, id.getB() == NIL ? null : id.getB()));
-      }
-      return copy;
-   }
-
-   private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
-
-      final ByteArrayHolder holder;
-
-      final long recordID;
-
-      volatile boolean done;
-
-      private final boolean afterCommit;
-
-      AddDuplicateIDOperation(final ByteArrayHolder holder, final long recordID, boolean afterCommit) {
-         this.holder = holder;
-         this.recordID = recordID;
-         this.afterCommit = afterCommit;
-      }
-
-      private void process() {
-         if (!done) {
-            addToCacheInMemory(holder, recordID);
-
-            done = true;
-         }
-      }
-
-      @Override
-      public void afterCommit(final Transaction tx) {
-         if (afterCommit) {
-            process();
-         }
-      }
-
-      @Override
-      public void beforeCommit(Transaction tx) throws Exception {
-         if (!afterCommit) {
-            process();
-         }
-      }
-
-      @Override
-      public List<MessageReference> getRelatedMessageReferences() {
-         return null;
-      }
-   }
-
-   private static final class ByteArrayHolder {
-
-      private final byte[] bytes;
-
-      private int hash;
-
-      ByteArrayHolder(final byte[] bytes) {
-         this.bytes = bytes;
-      }
-
-      @Override
-      public boolean equals(final Object other) {
-         if (other instanceof ByteArrayHolder) {
-            ByteArrayHolder s = (ByteArrayHolder) other;
-
-            return ByteUtil.equals(bytes, s.bytes);
-         } else {
-            return false;
-         }
-      }
-
-      @Override
-      public int hashCode() {
-         if (hash == 0) {
-            hash = ByteUtil.hashCode(bytes);
-         }
-
-         return hash;
-      }
-   }
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java
new file mode 100644
index 0000000..79b55e8
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+
+public final class DuplicateIDCaches {
+
+   private DuplicateIDCaches() {
+
+   }
+
+   public static DuplicateIDCache persistent(final SimpleString address,
+                                             final int size,
+                                             final StorageManager storageManager) {
+      return new PersistentDuplicateIDCache(address, size, storageManager);
+   }
+
+   public static DuplicateIDCache inMemory(final SimpleString address, final int size) {
+      return new InMemoryDuplicateIDCache(address, size);
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
new file mode 100644
index 0000000..f1f8614
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntFunction;
+
+import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
+
+/**
+ * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
+ * and memory footprint reasons.<br>
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
+ * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
+ * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
+ */
+final class InMemoryDuplicateIDCache implements DuplicateIDCache {
+
+   private static final Logger LOGGER = Logger.getLogger(InMemoryDuplicateIDCache.class);
+
+   private final Map<ByteArray, Integer> cache = new ConcurrentHashMap<>();
+
+   private final SimpleString address;
+
+   private final ArrayList<ByteArray> ids;
+
+   private final IntFunction<Integer> cachedBoxedInts;
+
+   private int pos;
+
+   private final int cacheSize;
+
+   InMemoryDuplicateIDCache(final SimpleString address, final int size) {
+      this.address = address;
+
+      cacheSize = size;
+
+      ids = new ArrayList<>(size);
+
+      cachedBoxedInts = boxedInts(size);
+   }
+
+   @Override
+   public void load(List<Pair<byte[], Long>> ids) throws Exception {
+      LOGGER.debugf("address = %s ignore loading ids: in memory cache won't load previously stored ids", address);
+   }
+
+   @Override
+   public void deleteFromCache(byte[] duplicateID) {
+      if (LOGGER.isTraceEnabled()) {
+         LOGGER.tracef("deleting id = %s", describeID(duplicateID));
+      }
+
+      ByteArray bah = new ByteArray(duplicateID);
+
+      Integer posUsed = cache.remove(bah);
+
+      if (posUsed != null) {
+         ByteArray id;
+
+         synchronized (this) {
+            final int index = posUsed.intValue();
+            id = ids.get(index);
+
+            if (id.equals(bah)) {
+               ids.set(index, null);
+               if (LOGGER.isTraceEnabled()) {
+                  LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
+               }
+            }
+         }
+      }
+
+   }
+
+   private static String describeID(byte[] duplicateID) {
+      return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
+   }
+
+   @Override
+   public boolean contains(final byte[] duplID) {
+      return contains(new ByteArray(duplID));
+   }
+
+   private boolean contains(final ByteArray id) {
+      boolean contains = cache.containsKey(id);
+
+      if (LOGGER.isTraceEnabled()) {
+         if (contains) {
+            LOGGER.tracef("address = %s found a duplicate ", address, describeID(id.bytes));
+         }
+      }
+      return contains;
+   }
+
+   @Override
+   public void addToCache(final byte[] duplID) throws Exception {
+      addToCache(duplID, null, false);
+   }
+
+   @Override
+   public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
+      addToCache(duplID, tx, false);
+   }
+
+   @Override
+   public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) {
+      final ByteArray holder = new ByteArray(duplID);
+      if (contains(holder)) {
+         if (tx != null) {
+            tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
+         }
+         return false;
+      }
+      addToCache(holder, tx, true);
+      return true;
+   }
+
+   @Override
+   public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
+      addToCache(new ByteArray(duplID), tx, instantAdd);
+   }
+
+   private synchronized void addToCache(final ByteArray holder, final Transaction tx, boolean instantAdd) {
+      if (tx == null) {
+         addToCacheInMemory(holder);
+      } else {
+         if (LOGGER.isTraceEnabled()) {
+            LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, describeID(holder.bytes), tx);
+         }
+
+         if (instantAdd) {
+            tx.addOperation(new AddDuplicateIDOperation(holder, false));
+         } else {
+            // For a tx, it's important that the entry is not added to the cache until commit
+            // since if the client fails then resends them tx we don't want it to get rejected
+            tx.afterStore(new AddDuplicateIDOperation(holder, true));
+         }
+      }
+   }
+
+   @Override
+   public void load(final Transaction tx, final byte[] duplID) {
+      tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), true));
+   }
+
+   private synchronized void addToCacheInMemory(final ByteArray holder) {
+      if (LOGGER.isTraceEnabled()) {
+         LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes));
+      }
+
+      cache.put(holder, cachedBoxedInts.apply(pos));
+
+      if (pos < ids.size()) {
+         // Need fast array style access here -hence ArrayList typing
+         final ByteArray id = ids.set(pos, holder);
+
+         // The id here might be null if it was explicit deleted
+         if (id != null) {
+            if (LOGGER.isTraceEnabled()) {
+               LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.bytes));
+            }
+
+            cache.remove(id);
+         }
+
+         if (LOGGER.isTraceEnabled()) {
+            LOGGER.tracef("address = %s replacing old duplicateID by %s", describeID(holder.bytes));
+         }
+
+      } else {
+         if (LOGGER.isTraceEnabled()) {
+            LOGGER.tracef("address = %s adding new duplicateID %s", describeID(holder.bytes));
+         }
+
+         ids.add(holder);
+      }
+
+      if (pos++ == cacheSize - 1) {
+         pos = 0;
+      }
+   }
+
+   @Override
+   public synchronized void clear() throws Exception {
+      if (LOGGER.isDebugEnabled()) {
+         LOGGER.debugf("address = %s removing duplicate ID data", address);
+      }
+      ids.clear();
+      cache.clear();
+      pos = 0;
+   }
+
+   @Override
+   public synchronized List<Pair<byte[], Long>> getMap() {
+      final int idsSize = ids.size();
+      List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
+      for (int i = 0; i < idsSize; i++) {
+         final ByteArray id = ids.get(i);
+         // in case the id has been removed
+         if (id != null) {
+            copy.add(new Pair<>(id.bytes, null));
+         }
+      }
+      return copy;
+   }
+
+   private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
+
+      final ByteArray id;
+
+      volatile boolean done;
+
+      private final boolean afterCommit;
+
+      AddDuplicateIDOperation(final ByteArray id, boolean afterCommit) {
+         this.id = id;
+         this.afterCommit = afterCommit;
+      }
+
+      private void process() {
+         if (!done) {
+            addToCacheInMemory(id);
+
+            done = true;
+         }
+      }
+
+      @Override
+      public void afterCommit(final Transaction tx) {
+         if (afterCommit) {
+            process();
+         }
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) throws Exception {
+         if (!afterCommit) {
+            process();
+         }
+      }
+
+      @Override
+      public List<MessageReference> getRelatedMessageReferences() {
+         return null;
+      }
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java
new file mode 100644
index 0000000..a726552
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.function.IntFunction;
+
+final class IntegerCache {
+
+   private static final boolean DISABLE_INTEGER_CACHE = Boolean.valueOf(System.getProperty("disable.integer.cache", Boolean.FALSE.toString()));
+
+   // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
+   private static WeakReference<Integer[]> INDEXES = null;
+
+   private static Integer[] ints(int size) {
+      final Reference<Integer[]> indexesRef = INDEXES;
+      final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
+      if (indexes != null && size <= indexes.length) {
+         return indexes;
+      }
+      final int newSize = size + (indexes == null ? 0 : size / 2);
+      final Integer[] newIndexes = new Integer[newSize];
+      if (indexes != null) {
+         System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
+      }
+      INDEXES = new WeakReference<>(newIndexes);
+      return newIndexes;
+   }
+
+   public static IntFunction<Integer> boxedInts(int size) {
+      if (DISABLE_INTEGER_CACHE) {
+         return Integer::valueOf;
+      }
+      // use a lambda to have an trusted const field for free
+      final Integer[] cachedInts = ints(size);
+      return index -> {
+         Integer boxedInt = cachedInts[index];
+         if (boxedInt == null) {
+            boxedInt = index;
+            cachedInts[index] = boxedInt;
+         }
+         assert boxedInt != null;
+         return boxedInt;
+      };
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
new file mode 100644
index 0000000..3e3758d
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntFunction;
+
+import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.api.core.ObjLongPair;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
+import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
+
+/**
+ * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
+ * and memory footprint reasons.<br>
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
+ * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
+ * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
+ */
+final class PersistentDuplicateIDCache implements DuplicateIDCache {
+
+   private static final Logger LOGGER = Logger.getLogger(PersistentDuplicateIDCache.class);
+
+   private final Map<ByteArray, Integer> cache = new ConcurrentHashMap<>();
+
+   private final SimpleString address;
+
+   private final ArrayList<ObjLongPair<ByteArray>> ids;
+
+   private final IntFunction<Integer> cachedBoxedInts;
+
+   private int pos;
+
+   private final int cacheSize;
+
+   private final StorageManager storageManager;
+
+   PersistentDuplicateIDCache(final SimpleString address, final int size, final StorageManager storageManager) {
+      this.address = address;
+
+      cacheSize = size;
+
+      ids = new ArrayList<>(size);
+
+      cachedBoxedInts = boxedInts(size);
+
+      this.storageManager = storageManager;
+   }
+
+   @Override
+   public synchronized void load(final List<Pair<byte[], Long>> ids) throws Exception {
+      if (!cache.isEmpty()) {
+         throw new IllegalStateException("load is valid only on empty cache");
+      }
+      // load only ids that fit this cache:
+      // - in term of remaining capacity
+      // - ignoring (and reporting) ids unpaired with record ID
+      // Then, delete the exceeding ones.
+
+      long txID = -1;
+
+      int toNotBeAdded = ids.size() - cacheSize;
+      if (toNotBeAdded < 0) {
+         toNotBeAdded = 0;
+      }
+
+      for (Pair<byte[], Long> id : ids) {
+         if (id.getB() == null) {
+            if (LOGGER.isTraceEnabled()) {
+               LOGGER.tracef("ignoring id = %s because without record ID", describeID(id.getA()));
+            }
+            if (toNotBeAdded > 0) {
+               toNotBeAdded--;
+            }
+            continue;
+         }
+         assert id.getB() != null && id.getB().longValue() != NIL;
+         if (toNotBeAdded > 0) {
+            if (txID == -1) {
+               txID = storageManager.generateID();
+            }
+            if (LOGGER.isTraceEnabled()) {
+               LOGGER.tracef("deleting id = %s", describeID(id.getA(), id.getB()));
+            }
+
+            storageManager.deleteDuplicateIDTransactional(txID, id.getB());
+            toNotBeAdded--;
+         } else {
+            ByteArray bah = new ByteArray(id.getA());
+
+            ObjLongPair<ByteArray> pair = new ObjLongPair<>(bah, id.getB());
+
+            cache.put(bah, cachedBoxedInts.apply(this.ids.size()));
+
+            this.ids.add(pair);
+            if (LOGGER.isTraceEnabled()) {
+               LOGGER.tracef("loading id = %s", describeID(id.getA(), id.getB()));
+            }
+         }
+
+      }
+
+      if (txID != -1) {
+         storageManager.commit(txID);
+      }
+
+      pos = this.ids.size();
+
+      if (pos == cacheSize) {
+         pos = 0;
+      }
+
+   }
+
+   @Override
+   public void deleteFromCache(byte[] duplicateID) throws Exception {
+      if (LOGGER.isTraceEnabled()) {
+         LOGGER.tracef("deleting id = %s", describeID(duplicateID));
+      }
+
+      final ByteArray bah = new ByteArray(duplicateID);
+
+      final Integer posUsed = cache.remove(bah);
+
+      if (posUsed != null) {
+         synchronized (this) {
+            final ObjLongPair<ByteArray> id = ids.get(posUsed.intValue());
+
+            if (id.getA().equals(bah)) {
+               final long recordID = id.getB();
+               id.setA(null);
+               id.setB(NIL);
+               if (LOGGER.isTraceEnabled()) {
+                  LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
+               }
+               storageManager.deleteDuplicateID(recordID);
+            }
+         }
+      }
+
+   }
+
+   private static String describeID(byte[] duplicateID) {
+      return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
+   }
+
+   private static String describeID(byte[] duplicateID, long id) {
+      return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
+   }
+
+   @Override
+   public boolean contains(final byte[] duplID) {
+      return contains(new ByteArray(duplID));
+   }
+
+   private boolean contains(final ByteArray duplID) {
+      boolean contains = cache.containsKey(duplID);
+
+      if (LOGGER.isTraceEnabled()) {
+         if (contains) {
+            LOGGER.tracef("address = %s found a duplicate %s", address, describeID(duplID.bytes));
+         }
+      }
+      return contains;
+   }
+
+   @Override
+   public void addToCache(final byte[] duplID) throws Exception {
+      addToCache(duplID, null, false);
+   }
+
+   @Override
+   public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
+      addToCache(duplID, tx, false);
+   }
+
+   @Override
+   public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
+      final ByteArray holder = new ByteArray(duplID);
+      if (contains(holder)) {
+         if (tx != null) {
+            tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
+         }
+         return false;
+      }
+      addToCache(holder, tx, true);
+      return true;
+   }
+
+   @Override
+   public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
+      addToCache(new ByteArray(duplID), tx, instantAdd);
+   }
+
+   private synchronized void addToCache(final ByteArray holder,
+                                        final Transaction tx,
+                                        boolean instantAdd) throws Exception {
+      final long recordID = storageManager.generateID();
+      if (tx == null) {
+         storageManager.storeDuplicateID(address, holder.bytes, recordID);
+
+         addToCacheInMemory(holder, recordID);
+      } else {
+         storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
+
+         tx.setContainsPersistent();
+
+         if (LOGGER.isTraceEnabled()) {
+            LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address,
+                          describeID(holder.bytes, recordID), tx);
+         }
+
+         if (instantAdd) {
+            tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
+         } else {
+            // For a tx, it's important that the entry is not added to the cache until commit
+            // since if the client fails then resends them tx we don't want it to get rejected
+            tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
+         }
+      }
+   }
+
+   @Override
+   public void load(final Transaction tx, final byte[] duplID) {
+      tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), tx.getID(), true));
+   }
+
+   private synchronized void addToCacheInMemory(final ByteArray holder, final long recordID) {
+      Objects.requireNonNull(holder, "holder must be not null");
+      if (recordID < 0) {
+         throw new IllegalArgumentException("recordID must be >= 0");
+      }
+      if (LOGGER.isTraceEnabled()) {
+         LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes, recordID));
+      }
+
+      cache.put(holder, cachedBoxedInts.apply(pos));
+
+      ObjLongPair<ByteArray> id;
+
+      if (pos < ids.size()) {
+         // Need fast array style access here -hence ArrayList typing
+         id = ids.get(pos);
+
+         // The id here might be null if it was explicit deleted
+         if (id.getA() != null) {
+            if (LOGGER.isTraceEnabled()) {
+               LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.getA().bytes, id.getB()));
+            }
+
+            cache.remove(id.getA());
+
+            assert id.getB() != NIL;
+            try {
+               storageManager.deleteDuplicateID(id.getB());
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
+            }
+         }
+
+         id.setA(holder);
+
+         id.setB(recordID);
+
+         if (LOGGER.isTraceEnabled()) {
+            LOGGER.tracef("address = %s replacing old duplicateID by %s", address, describeID(id.getA().bytes, id.getB()));
+         }
+
+      } else {
+         id = new ObjLongPair<>(holder, recordID);
+
+         if (LOGGER.isTraceEnabled()) {
+            LOGGER.tracef("address = %s adding new duplicateID %s", address, describeID(id.getA().bytes, id.getB()));
+         }
+
+         ids.add(id);
+
+      }
+
+      if (pos++ == cacheSize - 1) {
+         pos = 0;
+      }
+   }
+
+   @Override
+   public synchronized void clear() throws Exception {
+      LOGGER.debugf("address = %s removing duplicate ID data", address);
+      final int idsSize = ids.size();
+      if (idsSize > 0) {
+         long tx = storageManager.generateID();
+         for (int i = 0; i < idsSize; i++) {
+            final ObjLongPair<ByteArray> id = ids.get(i);
+            if (id.getA() != null) {
+               assert id.getB() != NIL;
+               storageManager.deleteDuplicateIDTransactional(tx, id.getB());
+            }
+         }
+         storageManager.commit(tx);
+      }
+
+      ids.clear();
+      cache.clear();
+      pos = 0;
+   }
+
+   @Override
+   public synchronized List<Pair<byte[], Long>> getMap() {
+      final int idsSize = ids.size();
+      List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
+      for (int i = 0; i < idsSize; i++) {
+         final ObjLongPair<ByteArray> id = ids.get(i);
+         // in case the pair has been removed
+         if (id.getA() != null) {
+            assert id.getB() != NIL;
+            copy.add(new Pair<>(id.getA().bytes, id.getB()));
+         }
+      }
+      return copy;
+   }
+
+   private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
+
+      final ByteArray holder;
+
+      final long recordID;
+
+      volatile boolean done;
+
+      private final boolean afterCommit;
+
+      AddDuplicateIDOperation(final ByteArray holder, final long recordID, boolean afterCommit) {
+         this.holder = holder;
+         this.recordID = recordID;
+         this.afterCommit = afterCommit;
+      }
+
+      private void process() {
+         if (!done) {
+            addToCacheInMemory(holder, recordID);
+
+            done = true;
+         }
+      }
+
+      @Override
+      public void afterCommit(final Transaction tx) {
+         if (afterCommit) {
+            process();
+         }
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) throws Exception {
+         if (!afterCommit) {
+            process();
+         }
+      }
+
+      @Override
+      public List<MessageReference> getRelatedMessageReferences() {
+         return null;
+      }
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 187ee58..07c0bdf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -16,6 +16,25 @@
  */
 package org.apache.activemq.artemis.core.postoffice.impl;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
 import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
@@ -82,25 +101,6 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
-
 import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
 
 /**
@@ -1315,7 +1315,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       DuplicateIDCache cache = duplicateIDCaches.get(address);
 
       if (cache == null) {
-         cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
+         if (persistIDCache) {
+            cache = DuplicateIDCaches.persistent(address, idCacheSize, storageManager);
+         } else {
+            cache = DuplicateIDCaches.inMemory(address, idCacheSize);
+         }
 
          DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
index 8300bdb..39fe106 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
@@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.RetryRule;
@@ -52,7 +52,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
    public void testDuplicate() throws Exception {
       createStorage();
 
-      DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true);
+      DuplicateIDCache cache = DuplicateIDCaches.persistent(new SimpleString("test"), 2000, journal);
 
       TransactionImpl tx = new TransactionImpl(journal);
 
@@ -108,7 +108,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
    public void testDuplicateNonPersistent() throws Exception {
       createStorage();
 
-      DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false);
+      DuplicateIDCache cache = DuplicateIDCaches.inMemory(new SimpleString("test"), 2000);
 
       TransactionImpl tx = new TransactionImpl(journal);
 
diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java
index 21505bf..0917267 100644
--- a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java
+++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java
@@ -21,7 +21,7 @@ import java.util.SplittableRandom;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
@@ -54,7 +54,9 @@ public class DuplicateIDCacheBenchmark {
 
    @Setup
    public void init() throws Exception {
-      cache = new DuplicateIDCacheImpl(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager(), persist);
+      cache = persist ?
+         DuplicateIDCaches.persistent(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager()) :
+         DuplicateIDCaches.inMemory(SimpleString.toSimpleString("benchmark"), size);
       final int idSize = findNextHigherPowerOf2(size);
       idsMask = idSize - 1;
       nextId = 0;
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 569f469..76c8619 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
 import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
 import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
 import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -40,8 +41,8 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
 import org.junit.Assert;
@@ -106,7 +107,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          Assert.assertEquals(0, mapDups.size());
 
-         DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
+         DuplicateIDCache cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
 
          for (int i = 0; i < 100; i++) {
             cacheID.addToCache(RandomUtil.randomBytes());
@@ -126,7 +127,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          Assert.assertEquals(10, values.size());
 
-         cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
+         cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
          cacheID.load(values);
 
          for (int i = 0; i < 100; i++) {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 5220a1a..d7aa3cc 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -28,14 +28,13 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -201,7 +200,7 @@ public class FakePostOffice implements PostOffice {
 
    @Override
    public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
-      return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false);
+      return DuplicateIDCaches.inMemory(address, 2000);
    }
 
    @Override


Mime
View raw message