activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch main updated: ARTEMIS-3327 removing unecessary blocking operations on update and delete records
Date Thu, 03 Jun 2021 14:37:58 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new cfd0327  ARTEMIS-3327 removing unecessary blocking operations on update and delete records
cfd0327 is described below

commit cfd032799c6704e480a180bdc27a3d28def099d9
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Wed Jun 2 12:53:36 2021 -0400

    ARTEMIS-3327 removing unecessary blocking operations on update and delete records
---
 .../jdbc/store/journal/JDBCJournalImpl.java        |  17 +-
 .../activemq/artemis/core/journal/Journal.java     |  20 ++-
 .../core/journal/JournalUpdateCallback.java        |  23 +++
 .../core/journal/impl/FileWrapperJournal.java      |   8 +-
 .../artemis/core/journal/impl/JournalBase.java     |  19 +--
 .../artemis/core/journal/impl/JournalImpl.java     | 185 ++++++++++-----------
 .../artemis/core/persistence/StorageManager.java   |   6 +-
 .../journal/AbstractJournalStorageManager.java     |  70 ++++----
 .../impl/nullpm/NullStorageManager.java            |   9 +-
 .../core/replication/ReplicatedJournal.java        |  25 +--
 .../core/replication/ReplicationEndpoint.java      |   2 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   4 +-
 .../artemis/core/server/impl/QueueImpl.java        |  10 +-
 .../core/transaction/impl/TransactionImplTest.java |   9 +-
 .../tests/integration/client/SendAckFailTest.java  |  12 +-
 .../persistence/DeleteMessagesOnStartupTest.java   |   4 +-
 .../integration/replication/ReplicationTest.java   |  20 +--
 .../integration/server/FakeStorageManager.java     |   3 +-
 .../core/journal/impl/JournalImplTestBase.java     |  18 +-
 19 files changed, 245 insertions(+), 219 deletions(-)

diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 9015c4e..154d39a 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@@ -497,9 +498,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
+   public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
       appendUpdateRecord(id, recordType, record, sync);
-      return true;
    }
 
    @Override
@@ -518,9 +518,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception {
+   public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
       appendUpdateRecord(id, recordType, persister, record, sync);
-      return true;
    }
 
    @Override
@@ -548,14 +547,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
 
 
    @Override
-   public boolean tryAppendUpdateRecord(long id,
+   public void tryAppendUpdateRecord(long id,
                                   byte recordType,
                                   Persister persister,
                                   Object record,
                                   boolean sync,
+                                  JournalUpdateCallback updateCallback,
                                   IOCompletion completionCallback) throws Exception {
       appendUpdateRecord(id, recordType, persister, record, sync, completionCallback);
-      return true;
    }
 
 
@@ -574,9 +573,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception {
+   public void tryAppendDeleteRecord(long id, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
       appendDeleteRecord(id, sync);
-      return true;
    }
 
    @Override
@@ -596,9 +594,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
+   public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception {
       appendDeleteRecord(id, sync, completionCallback);
-      return true;
    }
 
    @Override
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index d0734a8..6618e8d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -110,19 +110,19 @@ public interface Journal extends ActiveMQComponent {
 
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
+   void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
 
    default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
       appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
    }
 
-   default boolean tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
-      return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
+   default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
+      tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync);
    }
 
    void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
 
-   boolean tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
+   void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
 
    default void appendUpdateRecord(long id,
                                    byte recordType,
@@ -132,12 +132,13 @@ public interface Journal extends ActiveMQComponent {
       appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
    }
 
-   default boolean tryAppendUpdateRecord(long id,
+   default void tryAppendUpdateRecord(long id,
                                    byte recordType,
                                    EncodingSupport record,
                                    boolean sync,
+                                   JournalUpdateCallback updateCallback,
                                    IOCompletion completionCallback) throws Exception {
-      return tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
+      tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, updateCallback, completionCallback);
    }
 
    void appendUpdateRecord(long id,
@@ -147,20 +148,21 @@ public interface Journal extends ActiveMQComponent {
                            boolean sync,
                            IOCompletion callback) throws Exception;
 
-   boolean tryAppendUpdateRecord(long id,
+   void tryAppendUpdateRecord(long id,
                            byte recordType,
                            Persister persister,
                            Object record,
                            boolean sync,
+                           JournalUpdateCallback updateCallback,
                            IOCompletion callback) throws Exception;
 
    void appendDeleteRecord(long id, boolean sync) throws Exception;
 
-   boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception;
+   void tryAppendDeleteRecord(long id, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
 
    void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
 
-   boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
+   void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception;
 
    // Transactional operations
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java
new file mode 100644
index 0000000..2d595af
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalUpdateCallback.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal;
+
+
+public interface JournalUpdateCallback {
+   void onUpdate(long record, boolean result);
+}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 41da3ea..401797e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@@ -190,9 +191,8 @@ public final class FileWrapperJournal extends JournalBase {
 
 
    @Override
-   public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
+   public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception {
       appendDeleteRecord(id, sync, callback);
-      return true;
    }
 
    @Override
@@ -223,15 +223,15 @@ public final class FileWrapperJournal extends JournalBase {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(long id,
+   public void tryAppendUpdateRecord(long id,
                                      byte recordType,
                                      Persister persister,
                                      Object record,
                                      boolean sync,
+                                     JournalUpdateCallback updateCallback,
                                      IOCompletion callback) throws Exception {
       JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
       writeRecord(updateRecord, false, -1, false, callback);
-      return true;
    }
 
    @Override
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index c1a61a5..d2df4de 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 
@@ -89,11 +90,12 @@ abstract class JournalBase implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(final long id,
+   public void tryAppendUpdateRecord(final long id,
                                      final byte recordType,
                                      final byte[] record,
+                                     JournalUpdateCallback updateCallback,
                                      final boolean sync) throws Exception {
-      return tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
+      tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync);
    }
 
    @Override
@@ -156,20 +158,19 @@ abstract class JournalBase implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(final long id,
+   public void tryAppendUpdateRecord(final long id,
                                      final byte recordType,
                                      final Persister persister,
                                      final Object record,
+                                     final JournalUpdateCallback updateCallback,
                                      final boolean sync) throws Exception {
       SyncIOCompletion callback = getSyncCallback(sync);
 
-      boolean append = tryAppendUpdateRecord(id, recordType, persister, record, sync, callback);
+      tryAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback);
 
       if (callback != null) {
          callback.waitCompletion();
       }
-
-      return append;
    }
 
    @Override
@@ -196,16 +197,14 @@ abstract class JournalBase implements Journal {
    }
 
    @Override
-   public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception {
+   public void tryAppendDeleteRecord(final long id, JournalUpdateCallback updateCallback, final boolean sync) throws Exception {
       SyncIOCompletion callback = getSyncCallback(sync);
 
-      boolean result = tryAppendDeleteRecord(id, sync, callback);
+      tryAppendDeleteRecord(id, sync, updateCallback, callback);
 
       if (callback != null) {
          callback.waitCompletion();
       }
-
-      return result;
    }
    abstract void scheduleReclaim();
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 4bb2647..aac1554 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
@@ -87,7 +88,6 @@ import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
-import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
 import org.apache.activemq.artemis.utils.collections.LongHashSet;
 import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
 import org.jboss.logging.Logger;
@@ -288,8 +288,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    // Compacting may replace this structure
    private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>();
 
-   private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet();
-
    // Compacting may replace this structure
    private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
 
@@ -908,7 +906,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
       lineUpContext(callback);
-      pendingRecords.add(id);
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendAddRecord::id=" + id +
@@ -952,7 +949,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                setErrorCondition(callback, null, e);
                logger.error("appendAddRecord::"  + e, e);
             } finally {
-               pendingRecords.remove(id);
                journalLock.readLock().unlock();
             }
          }
@@ -1011,7 +1007,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             setErrorCondition(callback, null, e);
             logger.error("appendAddEvent::"  + e, e);
          } finally {
-            pendingRecords.remove(id);
             journalLock.readLock().unlock();
          }
       });
@@ -1028,7 +1023,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                   final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
       lineUpContext(callback);
-      checkKnownRecordID(id, true);
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendUpdateRecord::id=" + id +
@@ -1036,27 +1030,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                          recordType);
       }
 
-      internalAppendUpdateRecord(id, recordType, persister, record, sync, callback);
+      SimpleFuture<Boolean> future = new SimpleFutureImpl<>();
+
+      internalAppendUpdateRecord(id, recordType, persister, record, sync, (t, v) -> future.set(v), callback);
+
+      if (!future.get()) {
+         throw new IllegalStateException("Cannot find add info " + id);
+      }
    }
 
 
    @Override
-   public boolean tryAppendUpdateRecord(final long id,
-                                  final byte recordType,
-                                  final Persister persister,
-                                  final Object record,
-                                  final boolean sync,
-                                  final IOCompletion callback) throws Exception {
+   public void tryAppendUpdateRecord(final long id,
+                                     final byte recordType,
+                                     final Persister persister,
+                                     final Object record,
+                                     final boolean sync,
+                                     JournalUpdateCallback updateCallback,
+                                     final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
       lineUpContext(callback);
 
-      if (!checkKnownRecordID(id, false)) {
-         if (callback != null) {
-            callback.done();
-         }
-         return false;
-      }
-
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendUpdateRecord::id=" + id +
                          ", userRecordType=" +
@@ -1064,9 +1058,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
 
 
-      internalAppendUpdateRecord(id, recordType, persister, record, sync, callback);
-
-      return true;
+      internalAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback);
    }
 
 
@@ -1075,14 +1067,32 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                            Persister persister,
                                            Object record,
                                            boolean sync,
+                                           JournalUpdateCallback updateCallback,
                                            IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
             try {
+               // compactor will never change while readLock is acquired.
+               // but we are doing this since compactor is volatile, to avoid some extra work from JIT
+               JournalCompactor compactor = JournalImpl.this.compactor;
                JournalRecord jrnRecord = records.get(id);
+               if (jrnRecord == null) {
+                  if (compactor == null || (!compactor.containsRecord(id))) {
+                     if (updateCallback != null) {
+                        updateCallback.onUpdate(id, false);
+                     }
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Record " + id + " had not been found");
+                     }
+
+                     if (callback != null) {
+                        callback.done();
+                     }
+                     return;
+                  }
+               }
                JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
@@ -1097,17 +1107,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                // record==null here could only mean there is a compactor
                // computing the delete should be done after compacting is done
                if (jrnRecord == null) {
-                  compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+                  if (compactor != null) {
+                     compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+                  }
                } else {
                   jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
                }
 
-               result.set(true);
+               if (updateCallback != null) {
+                  updateCallback.onUpdate(id, true);
+               }
             } catch (ActiveMQShutdownException e) {
-               result.fail(e);
+               if (updateCallback != null) {
+                  updateCallback.onUpdate(id, false);
+               }
                logger.error("appendUpdateRecord:" + e, e);
             } catch (Throwable e) {
-               result.fail(e);
+               if (updateCallback != null) {
+                  updateCallback.onUpdate(id, false);
+               }
                setErrorCondition(callback, null, e);
                logger.error("appendUpdateRecord:" + e, e);
             } finally {
@@ -1115,8 +1133,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             }
          }
       });
-
-      result.get();
    }
 
    @Override
@@ -1129,15 +1145,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       checkJournalIsLoaded();
       lineUpContext(callback);
-      checkKnownRecordID(id, true);
-
-      internalAppendDeleteRecord(id, sync, callback);
+      SimpleFuture<Boolean> future = new SimpleFutureImpl<>();
+      internalAppendDeleteRecord(id, sync, (t, v) -> future.set(v), callback);
+      if (!future.get()) {
+         throw new IllegalStateException("Cannot find add info " + id);
+      }
       return;
    }
 
 
    @Override
-   public boolean tryAppendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
+   public void tryAppendDeleteRecord(final long id, final boolean sync, final JournalUpdateCallback updateCallback, final IOCompletion callback) throws Exception {
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendDeleteRecord::id=" + id);
@@ -1146,29 +1164,45 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       checkJournalIsLoaded();
       lineUpContext(callback);
-      if (!checkKnownRecordID(id, false)) {
-         if (callback != null) {
-            callback.done();
-         }
-         return false;
-      }
-
-      internalAppendDeleteRecord(id, sync, callback);
-      return true;
+      internalAppendDeleteRecord(id, sync, updateCallback, callback);
    }
 
    private void internalAppendDeleteRecord(long id,
                                            boolean sync,
-                                           IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+                                           JournalUpdateCallback updateCallback,
+                                           IOCompletion callback)  {
+
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
             try {
+               // compactor will never change while readLock is acquired.
+               // but we are doing this since compactor is volatile, to avoid some extra work from JIT
+               JournalCompactor compactor = JournalImpl.this.compactor;
                JournalRecord record = null;
                if (compactor == null) {
                   record = records.remove(id);
+                  if (record == null) {
+                     if (updateCallback != null) {
+                        updateCallback.onUpdate(id, false);
+                     }
+
+                     if (callback != null) {
+                        callback.done();
+                     }
+                     return;
+                  }
+               } else {
+                  if (!records.containsKey(id) && !compactor.containsRecord(id)) {
+                     if (updateCallback != null) {
+                        updateCallback.onUpdate(id, false);
+                     }
+                     if (callback != null) {
+                        callback.done();
+                     }
+                     return;
+                  }
                }
 
                JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
@@ -1182,20 +1216,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                // computing the delete should be done after compacting is done
                if (record == null) {
                   // JournalImplTestUni::testDoubleDelete was written to validate this condition:
-                  if (compactor == null) {
-                     logger.debug("Record " + id + " had been deleted already from a different call");
-                  } else {
-                     compactor.addCommandDelete(id, usedFile);
-                  }
+                  compactor.addCommandDelete(id, usedFile);
                } else {
                   record.delete(usedFile);
                }
-               result.set(true);
+               if (updateCallback != null) {
+                  updateCallback.onUpdate(id, true);
+               }
             } catch (ActiveMQShutdownException e) {
-               result.fail(e);
+               if (updateCallback != null) {
+                  updateCallback.onUpdate(id, false);
+               }
                logger.error("appendDeleteRecord:" + e, e);
             } catch (Throwable e) {
-               result.fail(e);
+               if (updateCallback != null) {
+                  updateCallback.onUpdate(id, false);
+               }
                logger.error("appendDeleteRecord:" + e, e);
                setErrorCondition(callback, null, e);
             } finally {
@@ -1203,8 +1239,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             }
          }
       });
-
-      result.get();
    }
 
    private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
@@ -1266,45 +1300,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
    }
-
-   private boolean checkKnownRecordID(final long id, boolean strict) throws Exception {
-      if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.containsRecord(id))) {
-         return true;
-      }
-
-      final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
-
-      // retry on the append thread. maybe the appender thread is not keeping up.
-      appendExecutor.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               journalLock.readLock().lock();
-               try {
-
-                  known.set(records.containsKey(id)
-                          || pendingRecords.contains(id)
-                          || (compactor != null && compactor.containsRecord(id)));
-               } finally {
-                  journalLock.readLock().unlock();
-               }
-            } catch (Throwable t) {
-               known.fail(t);
-               throw t;
-            }
-         }
-      });
-
-      if (!known.get()) {
-         if (strict) {
-            throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
-         }
-         return false;
-      } else {
-         return true;
-      }
-   }
-
    private void checkJournalIsLoaded() throws Exception {
       if (state != JournalState.LOADED && state != JournalState.SYNCING) {
          throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index ca53c7c..748f682 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -198,15 +198,15 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void storeReference(long queueID, long messageID, boolean last) throws Exception;
 
-   boolean deleteMessage(long messageID) throws Exception;
+   void deleteMessage(long messageID) throws Exception;
 
    void storeAcknowledge(long queueID, long messageID) throws Exception;
 
    void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
 
-   boolean updateDeliveryCount(MessageReference ref) throws Exception;
+   void updateDeliveryCount(MessageReference ref) throws Exception;
 
-   boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception;
+   void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
 
    void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 632d8b9..a447285 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -360,7 +360,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void confirmPendingLargeMessage(long recordID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.appendDeleteRecord(recordID, true, getContext());
+         messageJournal.tryAppendDeleteRecord(recordID, true, this::messageUpdateCallback, getContext());
       }
    }
 
@@ -385,7 +385,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional));
+         messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, this::messageUpdateCallback, getContext(last && syncNonTransactional));
       }
    }
 
@@ -428,7 +428,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional));
+         messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional));
       }
    }
 
@@ -442,21 +442,35 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    }
 
    @Override
-   public boolean deleteMessage(final long messageID) throws Exception {
+   public void deleteMessage(final long messageID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
          // Messages are deleted on postACK, one after another.
          // If these deletes are synchronized, we would build up messages on the Executor
          // increasing chances of losing deletes.
          // The StorageManager should verify messages without references
-         return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false));
+         messageJournal.tryAppendDeleteRecord(messageID, false, this::messageUpdateCallback, getContext(false));
+      }
+   }
+
+   private void messageUpdateCallback(long id, boolean found) {
+      if (!found) {
+         ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(new Exception(), id);
+      }
+   }
+
+   private void recordNotFoundCallback(long id, boolean found) {
+      if (!found) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Record " + id + " not found");
+         }
       }
    }
 
    @Override
-   public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
+   public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
       try (ArtemisCloseable lock = closeableReadLock()) {
-         return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional));
+         messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional));
       }
    }
 
@@ -472,7 +486,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void deleteDuplicateID(final long recordID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+         messageJournal.tryAppendDeleteRecord(recordID, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional));
       }
    }
 
@@ -546,7 +560,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
    @Override
    public void deletePageComplete(long ackID) throws Exception {
-      messageJournal.appendDeleteRecord(ackID, false);
+      messageJournal.tryAppendDeleteRecord(ackID, this::recordNotFoundCallback, false);
    }
 
    @Override
@@ -558,7 +572,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
    @Override
    public void deleteCursorAcknowledge(long ackID) throws Exception {
-      messageJournal.appendDeleteRecord(ackID, false);
+      messageJournal.tryAppendDeleteRecord(ackID, this::recordNotFoundCallback, false);
    }
 
    @Override
@@ -574,14 +588,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void deleteHeuristicCompletion(final long id) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.appendDeleteRecord(id, true, getContext(true));
+         messageJournal.tryAppendDeleteRecord(id, true, this::recordNotFoundCallback, getContext(true));
       }
    }
 
    @Override
    public void deletePageTransactional(final long recordID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.appendDeleteRecord(recordID, false);
+         messageJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, false);
       }
    }
 
@@ -677,18 +691,18 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    // Other operations
 
    @Override
-   public boolean updateDeliveryCount(final MessageReference ref) throws Exception {
+   public void updateDeliveryCount(final MessageReference ref) throws Exception {
       // no need to store if it's the same value
       // otherwise the journal will get OME in case of lots of redeliveries
       if (ref.getDeliveryCount() == ref.getPersistedCount()) {
-         return true;
+         return;
       }
 
       ref.setPersistedCount(ref.getDeliveryCount());
       DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
 
       try (ArtemisCloseable lock = closeableReadLock()) {
-         return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
+         messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional));
       }
    }
 
@@ -741,7 +755,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName);
       if (oldDivert != null) {
          try (ArtemisCloseable lock = closeableReadLock()) {
-            bindingsJournal.appendDeleteRecord(oldDivert.getStoreId(), false);
+            bindingsJournal.tryAppendDeleteRecord(oldDivert.getStoreId(), this::recordNotFoundCallback, false);
          }
       }
    }
@@ -767,7 +781,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       PersistedUser oldUser = mapPersistedUsers.remove(username);
       if (oldUser != null) {
          try (ArtemisCloseable lock = closeableReadLock()) {
-            bindingsJournal.appendDeleteRecord(oldUser.getStoreId(), false);
+            bindingsJournal.tryAppendDeleteRecord(oldUser.getStoreId(), this::recordNotFoundCallback, false);
          }
       }
    }
@@ -793,7 +807,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       PersistedRole oldRole = mapPersistedRoles.remove(username);
       if (oldRole != null) {
          try (ArtemisCloseable lock = closeableReadLock()) {
-            bindingsJournal.appendDeleteRecord(oldRole.getStoreId(), false);
+            bindingsJournal.tryAppendDeleteRecord(oldRole.getStoreId(), this::recordNotFoundCallback, false);
          }
       }
    }
@@ -813,7 +827,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void deleteID(long journalD) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         bindingsJournal.appendDeleteRecord(journalD, false);
+         bindingsJournal.tryAppendDeleteRecord(journalD, this::recordNotFoundCallback, false);
       }
    }
 
@@ -822,7 +836,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
       if (oldSetting != null) {
          try (ArtemisCloseable lock = closeableReadLock()) {
-            bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+            bindingsJournal.tryAppendDeleteRecord(oldSetting.getStoreId(), this::recordNotFoundCallback, false);
          }
       }
    }
@@ -832,7 +846,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch);
       if (oldRoles != null) {
          try (ArtemisCloseable lock = closeableReadLock()) {
-            bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+            bindingsJournal.tryAppendDeleteRecord(oldRoles.getStoreId(), this::recordNotFoundCallback, false);
          }
       }
    }
@@ -1094,7 +1108,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                         sub.reloadACK(encoding.position);
                      } else {
                         ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID);
-                        messageJournal.appendDeleteRecord(record.id, false);
+                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
 
                      }
 
@@ -1111,7 +1125,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                         sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
                      } else {
                         ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
-                        messageJournal.appendDeleteRecord(record.id, false);
+                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                      }
 
                      break;
@@ -1128,7 +1142,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                         sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
                      } else {
                         ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
-                        messageJournal.appendDeleteRecord(record.id, false);
+                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                      }
 
                      break;
@@ -1147,11 +1161,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                            if (logger.isDebugEnabled()) {
                               logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress());
                            }
-                           messageJournal.appendDeleteRecord(record.id, false);
+                           messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                         }
                      } else {
                         ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID);
-                        messageJournal.appendDeleteRecord(record.id, false);
+                        messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
                      }
 
                      break;
@@ -1332,7 +1346,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void deleteQueueStatus(long recordID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         bindingsJournal.appendDeleteRecord(recordID, true);
+         bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true);
       }
    }
 
@@ -1350,7 +1364,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    @Override
    public void deleteAddressStatus(long recordID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
-         bindingsJournal.appendDeleteRecord(recordID, true);
+         bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true);
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 3c3824c..61a95c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -237,8 +237,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public boolean deleteMessage(final long messageID) throws Exception {
-      return true;
+   public void deleteMessage(final long messageID) throws Exception {
    }
 
    @Override
@@ -250,8 +249,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
-      return true;
+   public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
    }
 
    @Override
@@ -263,8 +261,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public boolean updateDeliveryCount(final MessageReference ref) throws Exception {
-      return true;
+   public void updateDeliveryCount(final MessageReference ref) throws Exception {
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 4661149..58f7069 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@@ -238,12 +239,12 @@ public class ReplicatedJournal implements Journal {
     * @see org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, boolean)
     */
    @Override
-   public boolean tryAppendDeleteRecord(final long id, final boolean sync) throws Exception {
+   public void tryAppendDeleteRecord(final long id, final JournalUpdateCallback updateCallback, final boolean sync) throws Exception {
       if (log.isTraceEnabled()) {
          log.trace("AppendDelete " + id);
       }
       replicationManager.appendDeleteRecord(journalID, id);
-      return localJournal.tryAppendDeleteRecord(id, sync);
+      localJournal.tryAppendDeleteRecord(id, updateCallback, sync);
    }
 
    @Override
@@ -258,14 +259,15 @@ public class ReplicatedJournal implements Journal {
    }
 
    @Override
-   public boolean tryAppendDeleteRecord(final long id,
+   public void tryAppendDeleteRecord(final long id,
                                   final boolean sync,
+                                  final JournalUpdateCallback updateCallback,
                                   final IOCompletion completionCallback) throws Exception {
       if (log.isTraceEnabled()) {
          log.trace("AppendDelete " + id);
       }
       replicationManager.appendDeleteRecord(journalID, id);
-      return localJournal.tryAppendDeleteRecord(id, sync, completionCallback);
+      localJournal.tryAppendDeleteRecord(id, sync, updateCallback, completionCallback);
    }
    /**
     * @param txID
@@ -395,12 +397,13 @@ public class ReplicatedJournal implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(final long id,
+   public void tryAppendUpdateRecord(final long id,
                                      final byte recordType,
                                      final byte[] record,
+                                     final JournalUpdateCallback updateCallback,
                                      final boolean sync) throws Exception {
 
-      return this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
+      this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync);
    }
 
    /**
@@ -425,16 +428,17 @@ public class ReplicatedJournal implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(final long id,
+   public void tryAppendUpdateRecord(final long id,
                                   final byte recordType,
                                   final Persister persister,
                                   final Object record,
+                                  final JournalUpdateCallback updateCallback,
                                   final boolean sync) throws Exception {
       if (log.isTraceEnabled()) {
          log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
       }
       replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
-      return localJournal.tryAppendUpdateRecord(id, recordType, persister, record, sync);
+      localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync);
    }
 
    @Override
@@ -452,17 +456,18 @@ public class ReplicatedJournal implements Journal {
    }
 
    @Override
-   public boolean tryAppendUpdateRecord(final long id,
+   public void tryAppendUpdateRecord(final long id,
                                   final byte journalRecordType,
                                   final Persister persister,
                                   final Object record,
                                   final boolean sync,
+                                  final JournalUpdateCallback updateCallback,
                                   final IOCompletion completionCallback) throws Exception {
       if (log.isTraceEnabled()) {
          log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
       }
       replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
-      return localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
+      localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, updateCallback, completionCallback);
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1220260..b173f3a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -730,7 +730,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
     */
    private void handleAppendDelete(final ReplicationDeleteMessage packet) throws Exception {
       Journal journalToUse = getJournal(packet.getJournalID());
-      journalToUse.appendDeleteRecord(packet.getId(), noSync);
+      journalToUse.tryAppendDeleteRecord(packet.getId(), null, noSync);
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index c754c19..59a4917 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1077,9 +1077,9 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void errorDecrementingRefCount(@Cause Throwable e);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222153, value = "Unable to remove message id = {0} please remove manually",
+   @Message(id = 222153, value = "Cannot locate record for message id = {0} on Journal",
       format = Message.Format.MESSAGE_FORMAT)
-   void errorRemovingMessage(@Cause Throwable e, Long messageID);
+   void cannotFindMessageOnJournal(@Cause Throwable e, Long messageID);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222154, value = "Error checking DLQ",
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a8ce01d..d97891d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3278,9 +3278,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
-         if (!storageManager.updateDeliveryCount(reference)) {
-            return new Pair<>(false, false);
-         }
+         storageManager.updateDeliveryCount(reference);
       }
 
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@@ -3920,11 +3918,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                // as we can't delete each messaging with sync=true while adding messages transactionally.
                // There is a startup check to remove non referenced messages case these deletes fail
                try {
-                  if (!storageManager.deleteMessage(message.getMessageID())) {
-                     ActiveMQServerLogger.LOGGER.errorRemovingMessage(new Exception(), message.getMessageID());
-                  }
+                  storageManager.deleteMessage(message.getMessageID());
                } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
+                  ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(e, message.getMessageID());
                }
             }
          }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index bd1979b..77ba7ee 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -370,8 +370,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean deleteMessage(long messageID) throws Exception {
-         return true;
+      public void deleteMessage(long messageID) throws Exception {
       }
 
       @Override
@@ -385,13 +384,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean updateDeliveryCount(MessageReference ref) throws Exception {
-         return true;
+      public void updateDeliveryCount(MessageReference ref) throws Exception {
       }
 
       @Override
-      public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception {
-         return true;
+      public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
       }
 
       @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 64a4c65..01b1c22 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -441,8 +441,8 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
-      public boolean deleteMessage(long messageID) throws Exception {
-         return manager.deleteMessage(messageID);
+      public void deleteMessage(long messageID) throws Exception {
+         manager.deleteMessage(messageID);
       }
 
       @Override
@@ -456,13 +456,13 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
-      public boolean updateDeliveryCount(MessageReference ref) throws Exception {
-         return manager.updateDeliveryCount(ref);
+      public void updateDeliveryCount(MessageReference ref) throws Exception {
+         manager.updateDeliveryCount(ref);
       }
 
       @Override
-      public boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception {
-         return manager.updateScheduledDeliveryTime(ref);
+      public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
+         manager.updateScheduledDeliveryTime(ref);
       }
 
       @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index 3fc2a05..e69d4d9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
    protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
       return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
          @Override
-         public boolean deleteMessage(final long messageID) throws Exception {
+         public void deleteMessage(final long messageID) throws Exception {
             deletedMessage.add(messageID);
-            return super.deleteMessage(messageID);
+            super.deleteMessage(messageID);
          }
       };
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 24342ff..3552f2a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
@@ -696,12 +697,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean tryAppendUpdateRecord(long id,
+      public void tryAppendUpdateRecord(long id,
                                            byte recordType,
                                            Persister persister,
-                                           Object record,
+                                           Object record, JournalUpdateCallback updateCallback,
                                            boolean sync) throws Exception {
-         return true;
       }
 
       @Override
@@ -715,13 +715,12 @@ public final class ReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean tryAppendUpdateRecord(long id,
+      public void tryAppendUpdateRecord(long id,
                                            byte recordType,
                                            Persister persister,
                                            Object record,
-                                           boolean sync,
+                                           boolean sync, JournalUpdateCallback updateCallback,
                                            IOCompletion callback) throws Exception {
-         return true;
       }
 
       @Override
@@ -795,8 +794,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception {
-         return true;
+      public void tryAppendDeleteRecord(long id, JournalUpdateCallback updateConsumer, boolean sync) throws Exception {
       }
 
       @Override
@@ -846,8 +844,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
-         return true;
+      public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
       }
 
       @Override
@@ -951,8 +948,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
-         return true;
+      public void tryAppendDeleteRecord(long id, boolean sync, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception {
       }
 
       @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
index c38bc45..67cfe18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
@@ -39,9 +39,8 @@ public class FakeStorageManager extends NullStorageManager {
    }
 
    @Override
-   public boolean deleteMessage(final long messageID) throws Exception {
+   public void deleteMessage(final long messageID) throws Exception {
       messageIds.remove(messageID);
-      return true;
    }
 
    @Override
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index 34b2d9a..0ec7f5c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal;
 import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal;
@@ -41,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
 import org.jboss.logging.Logger;
 import org.junit.After;
@@ -418,13 +420,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
 
       beforeJournalOperation();
 
-      boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, sync);
+      SimpleFutureImpl<Boolean> future = new SimpleFutureImpl();
 
-      if (result) {
+      journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync);
+
+      if (future.get()) {
+         Assert.fail();
          records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0));
       }
 
-      return result;
+      return future.get();
    }
 
    protected void update(final long... arguments) throws Exception {
@@ -456,15 +461,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
    protected boolean tryDelete(final long argument) throws Exception {
       beforeJournalOperation();
 
-      boolean result = journal.tryAppendDeleteRecord(argument, sync);
+      AtomicBoolean result = new AtomicBoolean(true);
+      journal.tryAppendDeleteRecord(argument, (t, b) -> result.set(b), sync);
 
-      if (result) {
+      if (result.get()) {
          removeRecordsForID(argument);
       }
 
       journal.debugWait();
 
-      return result;
+      return result.get();
    }
 
    protected void addTx(final long txID, final long... arguments) throws Exception {

Mime
View raw message