activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2305 ACK counters to only increment after commit
Date Fri, 12 Apr 2019 19:48:17 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new da4f95c  ARTEMIS-2305 ACK counters to only increment after commit
     new 3589dd7  This closes #2618
da4f95c is described below

commit da4f95cf714317d956ed3aed11dd39e2c4e39368
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Tue Apr 2 17:17:23 2019 -0400

    ARTEMIS-2305 ACK counters to only increment after commit
    
    Also including a new metric for ack attempts that will keep the former semantic.
---
 .../apache/activemq/artemis/logs/AuditLogger.java  | 10 +++
 .../artemis/api/core/management/QueueControl.java  |  7 ++
 .../proton/transaction/ProtonTransactionImpl.java  |  5 +-
 .../ProtonTransactionRefsOperation.java            |  5 +-
 .../core/management/impl/QueueControlImpl.java     | 15 ++++
 .../apache/activemq/artemis/core/server/Queue.java |  4 +-
 .../artemis/core/server/impl/QueueImpl.java        | 59 ++++++++-------
 .../artemis/core/server/impl/RefsOperation.java    |  8 +-
 .../artemis/core/transaction/Transaction.java      |  3 +-
 .../transaction/impl/BindingsTransactionImpl.java  |  3 +-
 .../core/transaction/impl/TransactionImpl.java     |  5 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  7 +-
 .../client/InterruptedLargeMessageTest.java        |  3 +-
 .../tests/integration/jmx/JmxConnectionTest.java   |  2 +-
 .../integration/management/QueueControlTest.java   | 88 ++++++++++++++++++++++
 .../management/QueueControlUsingCoreTest.java      |  5 ++
 .../core/postoffice/impl/BindingsImplTest.java     |  3 +-
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  7 +-
 18 files changed, 196 insertions(+), 43 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 5bf8bea..a07608e 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2252,4 +2252,14 @@ public interface AuditLogger extends BasicLogger {
    @Message(id = 601500, value = "User {0} is sending a core message on target resource:
{1} {2}", format = Message.Format.MESSAGE_FORMAT)
    void coreSendMessage(String user, Object source, Object... args);
 
+
+   static void getAcknowledgeAttempts(Object source) {
+      LOGGER.getMessagesAcknowledged(getCaller(), source);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601501, value = "User {0} is getting messages acknowledged attemps on target
resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   void getAcknowledgeAttempts(String user, Object source, Object... args);
+
+
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index b57be21..0d3ab0f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -169,6 +169,13 @@ public interface QueueControl {
    long getMessagesAcknowledged();
 
    /**
+    * Returns the number of messages added to this queue since it was created.
+    */
+   @Attribute(desc = "number of messages acknowledged attempts from this queue since it was
created")
+   long getAcknowledgeAttempts();
+
+
+   /**
     * Returns the number of messages expired from this queue since it was created.
     */
    @Attribute(desc = "number of messages expired from this queue since it was created")
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 4c5a887..123dbb5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
@@ -67,8 +68,8 @@ public class ProtonTransactionImpl extends TransactionImpl {
    }
 
    @Override
-   public RefsOperation createRefsOperation(Queue queue) {
-      return new ProtonTransactionRefsOperation(queue, storageManager);
+   public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
+      return new ProtonTransactionRefsOperation(queue, reason, storageManager);
    }
 
    @Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
index 7b48ac0..4bb00d2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -36,8 +37,8 @@ import org.apache.qpid.proton.engine.Delivery;
  */
 public class ProtonTransactionRefsOperation extends RefsOperation {
 
-   public ProtonTransactionRefsOperation(final Queue queue, StorageManager storageManager)
{
-      super(queue, storageManager);
+   public ProtonTransactionRefsOperation(final Queue queue, AckReason reason, StorageManager
storageManager) {
+      super(queue, reason, storageManager);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 6270fe5..b789347 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -401,6 +401,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
    }
 
    @Override
+   public long getAcknowledgeAttempts() {
+      if (AuditLogger.isEnabled()) {
+         AuditLogger.getMessagesAcknowledged(queue);
+      }
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getAcknowledgeAttempts();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public long getMessagesExpired() {
       if (AuditLogger.isEnabled()) {
          AuditLogger.getMessagesExpired(queue);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2fcd766..b722b5f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -234,6 +234,8 @@ public interface Queue extends Bindable,CriticalComponent {
 
    long getMessagesAdded();
 
+   long getAcknowledgeAttempts();
+
    long getMessagesAcknowledged();
 
    long getMessagesExpired();
@@ -393,7 +395,7 @@ public interface Queue extends Bindable,CriticalComponent {
     */
    void deliverScheduledMessages() throws ActiveMQException;
 
-   void postAcknowledge(MessageReference ref);
+   void postAcknowledge(MessageReference ref, AckReason reason);
 
    float getRate();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9d04f5b..44f938e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -193,6 +193,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
    private AtomicLong messagesAcknowledged = new AtomicLong(0);
 
+   private AtomicLong ackAttempts = new AtomicLong(0);
+
    private AtomicLong messagesExpired = new AtomicLong(0);
 
    private AtomicLong messagesKilled = new AtomicLong(0);
@@ -1473,7 +1475,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       } else {
          if (ref.isPaged()) {
             pageSubscription.ack((PagedReference) ref);
-            postAcknowledge(ref);
+            postAcknowledge(ref, reason);
          } else {
             Message message = ref.getMessage();
 
@@ -1482,18 +1484,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
             if (durableRef) {
                storageManager.storeAcknowledge(id, message.getMessageID());
             }
-            postAcknowledge(ref);
+            postAcknowledge(ref, reason);
          }
 
-         if (reason == AckReason.EXPIRED) {
-            messagesExpired.incrementAndGet();
-         } else if (reason == AckReason.KILLED) {
-            messagesKilled.incrementAndGet();
-         } else if (reason == AckReason.REPLACED) {
-            messagesReplaced.incrementAndGet();
-         } else {
-            messagesAcknowledged.incrementAndGet();
-         }
+         ackAttempts.incrementAndGet();
 
          if (server != null && server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref,
reason, consumer));
@@ -1508,10 +1502,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
    @Override
    public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason
reason, final ServerConsumer consumer) throws Exception {
+      RefsOperation refsOperation = getRefsOperation(tx, reason);
+
       if (ref.isPaged()) {
          pageSubscription.ackTx(tx, (PagedReference) ref);
 
-         getRefsOperation(tx).addAck(ref);
+         refsOperation.addAck(ref);
       } else {
          Message message = ref.getMessage();
 
@@ -1523,15 +1519,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
             tx.setContainsPersistent();
          }
 
-         getRefsOperation(tx).addAck(ref);
-      }
+         ackAttempts.incrementAndGet();
 
-      if (reason == AckReason.EXPIRED) {
-         messagesExpired.incrementAndGet();
-      } else if (reason == AckReason.KILLED) {
-         messagesKilled.incrementAndGet();
-      } else {
-         messagesAcknowledged.incrementAndGet();
+         refsOperation.addAck(ref);
       }
 
       if (server != null && server.hasBrokerMessagePlugins()) {
@@ -1547,7 +1537,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
          tx.setContainsPersistent();
       }
 
-      getRefsOperation(tx).addAck(ref);
+      getRefsOperation(tx, AckReason.NORMAL).addAck(ref);
 
       // https://issues.jboss.org/browse/HORNETQ-609
       incDelivering(ref);
@@ -1555,16 +1545,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       messagesAcknowledged.incrementAndGet();
    }
 
-   private RefsOperation getRefsOperation(final Transaction tx) {
-      return getRefsOperation(tx, false);
+   private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason) {
+      return getRefsOperation(tx, ackReason, false);
    }
 
-   private RefsOperation getRefsOperation(final Transaction tx, boolean ignoreRedlieveryCheck)
{
+   private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason, boolean
ignoreRedlieveryCheck) {
       synchronized (tx) {
          RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
 
          if (oper == null) {
-            oper = tx.createRefsOperation(this);
+            oper = tx.createRefsOperation(this, ackReason);
 
             tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
 
@@ -1586,7 +1576,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
    @Override
    public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck)
{
-      getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+      getRefsOperation(tx, AckReason.NORMAL, ignoreRedeliveryCheck).addAck(reference);
    }
 
    @Override
@@ -1706,6 +1696,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
+   public long getAcknowledgeAttempts() {
+      return ackAttempts.get();
+   }
+
+   @Override
    public long getMessagesExpired() {
       return messagesExpired.get();
    }
@@ -3300,11 +3295,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
-   public void postAcknowledge(final MessageReference ref) {
+   public void postAcknowledge(final MessageReference ref, AckReason reason) {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
       queue.decDelivering(ref);
 
+      if (reason == AckReason.EXPIRED) {
+         messagesExpired.incrementAndGet();
+      } else if (reason == AckReason.KILLED) {
+         messagesKilled.incrementAndGet();
+      } else if (reason == AckReason.REPLACED) {
+         messagesReplaced.incrementAndGet();
+      } else {
+         messagesAcknowledged.incrementAndGet();
+      }
+
       if (ref.isPaged()) {
          // nothing to be done
          return;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 6d21be2..de52cc4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -38,6 +38,8 @@ public class RefsOperation extends TransactionOperationAbstract {
 
    private static final Logger logger = Logger.getLogger(RefsOperation.class);
 
+   private final AckReason reason;
+
    private final StorageManager storageManager;
    private Queue queue;
    List<MessageReference> refsToAck = new ArrayList<>();
@@ -50,11 +52,13 @@ public class RefsOperation extends TransactionOperationAbstract {
     */
    protected boolean ignoreRedeliveryCheck = false;
 
-   public RefsOperation(Queue queue, StorageManager storageManager) {
+   public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
       this.queue = queue;
+      this.reason = reason;
       this.storageManager = storageManager;
    }
 
+
    // once turned on, we shouldn't turn it off, that's why no parameters
    public void setIgnoreRedeliveryCheck() {
       ignoreRedeliveryCheck = true;
@@ -163,7 +167,7 @@ public class RefsOperation extends TransactionOperationAbstract {
    public void afterCommit(final Transaction tx) {
       for (MessageReference ref : refsToAck) {
          synchronized (ref.getQueue()) {
-            queue.postAcknowledge(ref);
+            queue.postAcknowledge(ref, reason);
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index 0ddc2cb..6fa2c5f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 
 /**
@@ -95,5 +96,5 @@ public interface Transaction {
 
    void setTimeout(int timeout);
 
-   RefsOperation createRefsOperation(Queue queue);
+   RefsOperation createRefsOperation(Queue queue, AckReason reason);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
index 50dff64..5bb1acc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.transaction.impl;
 
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 
 public class BindingsTransactionImpl extends TransactionImpl {
@@ -45,7 +46,7 @@ public class BindingsTransactionImpl extends TransactionImpl {
    }
 
    @Override
-   public RefsOperation createRefsOperation(Queue queue) {
+   public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
       return null;
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 9ef1554..d459975 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@@ -162,8 +163,8 @@ public class TransactionImpl implements Transaction {
    }
 
    @Override
-   public RefsOperation createRefsOperation(Queue queue) {
-      return new RefsOperation(queue, storageManager);
+   public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
+      return new RefsOperation(queue, reason, storageManager);
    }
 
    @Override
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 40ab28e..d241786 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -794,6 +794,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getAcknowledgeAttempts() {
+         return 0;
+      }
+
+      @Override
       public boolean allowsReferenceCallback() {
          return false;
       }
@@ -1477,7 +1482,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void postAcknowledge(MessageReference ref) {
+      public void postAcknowledge(MessageReference ref, AckReason reason) {
 
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 6056fcb..1218852 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -523,7 +524,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase
{
          }
 
          @Override
-         public void postAcknowledge(final MessageReference ref) {
+         public void postAcknowledge(final MessageReference ref, AckReason reason) {
             System.out.println("Ignoring postACK on message " + ref);
          }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
index 9fd2c3a..416dfcc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
@@ -105,7 +105,7 @@ public class JmxConnectionTest extends ActiveMQTestBase {
          logAndSystemOut("Successfully connected to: " + urlString);
       } catch (Exception e) {
          logAndSystemOut("JMX connection failed: " + urlString, e);
-         Assert.fail();
+         Assert.fail(e.getMessage());
          return;
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 4dd95db..4fa03b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -36,6 +36,7 @@ import javax.json.JsonArray;
 import javax.json.JsonObject;
 import javax.management.Notification;
 import javax.management.openmbean.CompositeData;
+import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -66,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.junit.Wait;
 import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
 import org.apache.activemq.artemis.utils.Base64;
@@ -371,6 +373,92 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testGetMessagesAcknowledgedOnXARollback() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(durable));
+
+      ClientSessionFactory xaFactory = createSessionFactory(locator);
+      ClientSession xaSession = addClientSession(xaFactory.createSession(true, false, false));
+      xaSession.start();
+
+      ClientConsumer consumer = xaSession.createConsumer(queue);
+
+      int tries = 10;
+      for (int i = 0; i < tries; i++) {
+         XidImpl xid = newXID();
+         xaSession.start(xid, XAResource.TMNOFLAGS);
+         ClientMessage message = consumer.receive(1000);
+         Assert.assertNotNull(message);
+         message.acknowledge();
+         Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+         xaSession.end(xid, XAResource.TMSUCCESS);
+         Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+         xaSession.prepare(xid);
+         Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+         if (i + 1 == tries) {
+            xaSession.commit(xid, false);
+         } else {
+            xaSession.rollback(xid);
+         }
+      }
+
+      Wait.assertEquals(1, queueControl::getMessagesAcknowledged);
+      Wait.assertEquals(10, queueControl::getAcknowledgeAttempts);
+
+      consumer.close();
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testGetMessagesAcknowledgedOnRegularRollback() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(durable));
+
+      ClientSessionFactory xaFactory = createSessionFactory(locator);
+      ClientSession txSession = addClientSession(xaFactory.createSession(false, false, false));
+      txSession.start();
+
+      ClientConsumer consumer = txSession.createConsumer(queue);
+
+      int tries = 10;
+      for (int i = 0; i < tries; i++) {
+         ClientMessage message = consumer.receive(1000);
+         Assert.assertNotNull(message);
+         message.acknowledge();
+         Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+         if (i + 1 == tries) {
+            txSession.commit();
+         } else {
+            txSession.rollback();
+         }
+      }
+
+      Wait.assertEquals(1, queueControl::getMessagesAcknowledged);
+      Wait.assertEquals(10, queueControl::getAcknowledgeAttempts);
+
+      consumer.close();
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
    public void testGetScheduledCount() throws Exception {
       long delay = 500;
       SimpleString address = RandomUtil.randomSimpleString();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index c40f655..254a287 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -217,6 +217,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public long getAcknowledgeAttempts() {
+            return (Integer) proxy.retrieveAttributeValue("acknowledgeAttempts", Integer.class);
+         }
+
+         @Override
          public long getMessagesExpired() {
             return (Long) proxy.retrieveAttributeValue("messagesExpired", Long.class);
          }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 01a568b..591f54a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -251,7 +252,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public RefsOperation createRefsOperation(Queue queue) {
+      public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
          // TODO Auto-generated method stub
          return null;
       }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 0aa631b..9d43b26 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -207,6 +207,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
+   public long getAcknowledgeAttempts() {
+      return 0;
+   }
+
+   @Override
    public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck)
{
       // no-op
    }
@@ -841,7 +846,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
-   public void postAcknowledge(MessageReference ref) {
+   public void postAcknowledge(MessageReference ref, AckReason reason) {
    }
 
    @Override


Mime
View raw message