activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2664 Simplify the credits acquiring.
Date Mon, 13 Apr 2020 20:57:18 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new fc64026  ARTEMIS-2664 Simplify the credits acquiring.
     new 89634e3  This closes #3029
fc64026 is described below

commit fc6402613d19b4f92fba89f54c9a47ed9d310458
Author: brusdev <bruscinodf@gmail.com>
AuthorDate: Thu Mar 19 12:11:28 2020 +0100

    ARTEMIS-2664 Simplify the credits acquiring.
    
    Replace the AtomicInteger with an int. Indeed deliveredAcks is used only by
    the acknowledge method and it is only executed by the EpollEventLoop thread
    bounded with the relative connection channel.
---
 .../core/protocol/openwire/amq/AMQConsumer.java       | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index eb4ce40..cb9c74b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -68,7 +68,7 @@ public class AMQConsumer {
 
    private int prefetchSize;
    private final AtomicInteger currentWindow;
-   private final AtomicInteger deliveredAcks;
+   private int deliveredAcks;
    private long messagePullSequence = 0;
    private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
    //internal means we don't expose
@@ -88,7 +88,7 @@ public class AMQConsumer {
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
       this.currentWindow = new AtomicInteger(prefetchSize);
-      this.deliveredAcks = new AtomicInteger(0);
+      this.deliveredAcks = 0;
       if (prefetchSize == 0) {
          messagePullHandler.set(new MessagePullHandler());
       }
@@ -300,18 +300,15 @@ public class AMQConsumer {
       List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences,
first, last);
 
       if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck()))
{
-         this.deliveredAcks.getAndUpdate(deliveredAcks -> {
-            if (deliveredAcks >= ackList.size()) {
-               return deliveredAcks - ackList.size();
-            }
-
+         if (deliveredAcks < ackList.size()) {
             acquireCredit(ackList.size() - deliveredAcks);
-
-            return 0;
-         });
+            deliveredAcks = 0;
+         } else {
+            deliveredAcks -= ackList.size();
+         }
       } else {
          if (ack.isDeliveredAck()) {
-            this.deliveredAcks.addAndGet(ack.getMessageCount());
+            this.deliveredAcks += ack.getMessageCount();
          }
 
          acquireCredit(ack.getMessageCount());


Mime
View raw message