pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Change un-ack messages start tracking behavior (#3079)
Date Mon, 03 Dec 2018 19:47:37 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d06b1e  Change un-ack messages start tracking behavior (#3079)
5d06b1e is described below

commit 5d06b1e128b6747952a855d02c8a2936d2a35cce
Author: penghui <codelipenghui@gmail.com>
AuthorDate: Tue Dec 4 03:47:33 2018 +0800

    Change un-ack messages start tracking behavior (#3079)
    
    ### Motivation
    #### Expected behavior
    
    User process the same message many times but failed, if user set a dead letter policy,
when message process times exceed the max redelivery count in dead letter policy, message
will send to the dead letter topic.
    
    #### Actual behavior
    
    When a consumer subscribe a topic, but wait a while then start receive messages, but messages
already send to dead letter topic.
    
    #### Steps to reproduce
    
    Here is the code to reproduce
    
    ```java
    public class RedeliveryIssue {
    
        public static void main(String[] args) throws PulsarClientException, InterruptedException
{
    
            final String topic = "my-topic";
    
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl("pulsar://localhost:6650")
                    .build();
    
            Consumer<byte[]> consumer = client.newConsumer()
                    .topic(topic)
                    .subscriptionType(SubscriptionType.Shared)
                    .subscriptionName(UUID.randomUUID().toString())
                    .ackTimeout(3, TimeUnit.SECONDS)
                    .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
                    .subscribe();
    
            Producer<byte[]> producer = client.newProducer()
                    .topic(topic)
                    .create();
    
            producer.send(("a message").getBytes());
    
            // wait a while, message will send to dead letter topic
            Thread.sleep(10000L);
    
            do {
                // can't receive message
                Message<byte[]> msg = consumer.receive();
                System.out.println(new String(msg.getValue()));
            } while (true);
        }
    }
    ```
    
    #### System configuration
    **Pulsar version**: 2.2.0
    
    ### Modifications
    
    Remove un-ack message tracking on message received.
    Add un-ack message tracking on consumer call receive
    
    ### Result
    
    UT passed
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 29 ++++++++++++++++++++++
 .../impl/UnAcknowledgedMessagesTimeoutTest.java    |  6 +++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 20 +++++++++++++--
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2ff500b..82df18c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 public class DeadLetterTopicTest extends ProducerConsumerBase {
@@ -256,4 +257,32 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         assertNull(checkMessage);
         checkConsumer.close();
     }
+
+    /**
+     * issue https://github.com/apache/pulsar/issues/3077
+     */
+    @Test(timeOut = 200000)
+    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException,
InterruptedException {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-subscription")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        producer.send(("a message").getBytes());
+
+        // Wait a while, message should not be send to DLQ
+        Thread.sleep(5000L);
+
+        Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index da53760..794ce31 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -373,9 +373,11 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase
{
 
         Thread.sleep((long) (ackTimeOutMillis * 1.1));
 
-        for (int i = 0; i < totalMessages - 1; i++) {
+        for (int i = 0; i < totalMessages; i++) {
             Message<byte[]> msg = consumer.receive();
-            consumer.acknowledge(msg);
+            if (i != totalMessages - 1) {
+                consumer.acknowledge(msg);
+            }
         }
 
         assertEquals(consumer.getUnAckedMessageTracker().size(), 1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6ae925d..3e2d4d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -294,6 +294,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         Message<T> message;
         try {
             message = incomingMessages.take();
+            trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             messageProcessed(interceptMsg);
             return interceptMsg;
@@ -325,6 +326,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         if (message == null && conf.getReceiverQueueSize() == 0) {
             sendFlowPermitsToBroker(cnx(), 1);
         } else if (message != null) {
+            trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             messageProcessed(interceptMsg);
             result.complete(interceptMsg);
@@ -385,6 +387,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         Message<T> message;
         try {
             message = incomingMessages.poll(timeout, unit);
+            trackMessage(message);
             Message<T> interceptMsg = beforeConsume(message);
             if (interceptMsg != null) {
                 messageProcessed(interceptMsg);
@@ -829,11 +832,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
                 // Enqueue the message so that it can be retrieved when application calls
receive()
                 // if the conf.getReceiverQueueSize() is 0 then discard message if no one
is waiting for it.
                 // if asyncReceive is waiting then notify callback without adding to incomingMessages
queue
-                unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
                 if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages
!= null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                     possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
                 }
                 if (!pendingReceives.isEmpty()) {
+                    trackMessage(message);
                     notifyPendingReceivedCallback(message, null);
                 } else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize)
{
                     incomingMessages.add(message);
@@ -957,7 +960,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
                 getPartitionIndex());
         BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
-        unAckedMessageTracker.add(batchMessage);
         List<MessageImpl<T>> possibleToDeadLetter = null;
         if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount())
{
             possibleToDeadLetter = new ArrayList<>();
@@ -1068,6 +1070,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         }
     }
 
+    protected void trackMessage(Message<?> msg) {
+        if (msg != null) {
+            MessageId messageId = msg.getMessageId();
+            if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl)
{
+                MessageIdImpl id = (MessageIdImpl)messageId;
+                if (id instanceof BatchMessageIdImpl) {
+                    // do not add each item in batch message into tracker
+                    id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
+                }
+                unAckedMessageTracker.add(id);
+            }
+        }
+    }
+
     void increaseAvailablePermits(ClientCnx currentCnx) {
         increaseAvailablePermits(currentCnx, 1);
     }


Mime
View raw message