pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rdhaba...@apache.org
Subject [incubator-pulsar] branch master updated: Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498)
Date Fri, 31 Aug 2018 22:49:12 GMT
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 143979d  Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498)
143979d is described below

commit 143979d15525520edeaededd6b7878b61f5c8bfe
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Fri Aug 31 15:49:09 2018 -0700

    Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498)
---
 .../pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0fefe9c..9a87d26 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -57,6 +57,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * Latest cumulative ack sent to broker
      */
     private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+    private volatile boolean cumulativeAckFulshRequired = false; 
 
     private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker,
MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
             .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class,
"lastCumulativeAck");
@@ -119,6 +120,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             if (msgId.compareTo(lastCumlativeAck) > 0) {
                 if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId))
{
                     // Successfully updated the last cumlative ack. Next flush iteration
will send this to broker.
+                    cumulativeAckFulshRequired = true;
                     return;
                 }
             } else {
@@ -160,10 +162,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             return;
         }
 
-        if (!lastCumulativeAck.equals(MessageId.earliest)) {
+        if (cumulativeAckFulshRequired) {
             ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId,
lastCumulativeAck.entryId,
                     AckType.Cumulative, null, Collections.emptyMap());
             cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+            cumulativeAckFulshRequired = false;
         }
 
         // Flush all individual acks


Mime
View raw message