pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Add default ackTimeout(30s) for dead letter policy (#3014)
Date Thu, 22 Nov 2018 01:35:06 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a7e133  Add default ackTimeout(30s) for dead letter policy (#3014)
0a7e133 is described below

commit 0a7e133d3ca7cbb15e88c1dd906738c8ba586558
Author: penghui <codelipenghui@gmail.com>
AuthorDate: Thu Nov 22 09:35:01 2018 +0800

    Add default ackTimeout(30s) for dead letter policy (#3014)
    
    Fix issue #2987
    ### Motivation
    
    In version 2.2.0, support DeadLetterTopic feature. This feature based on message redelivery.
So ack timeout is necessary.
    
    ### Modifications
    
    Set ackTimeout(30s) when enable the dead letter policy but not set the ackTimeout;
---
 .../main/java/org/apache/pulsar/client/api/ConsumerBuilder.java  | 2 ++
 .../java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java  | 9 ++++++++-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 7f5e448..62c3229 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -365,6 +365,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
      *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build())
      *          .subscribe();
      * </pre>
+     * When a dead letter policy is specified, and no ackTimeoutMillis is specified,
+     * then the ack timeout will be set to 30000 millisecond
      */
     ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 7853de5..b83804b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -60,6 +60,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T>
{
     private List<ConsumerInterceptor<T>> interceptorList;
 
     private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
+    private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;
+
 
     public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
         this(client, new ConsumerConfigurationData<T>(), schema);
@@ -266,7 +268,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T>
{
 
     @Override
     public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
-        conf.setDeadLetterPolicy(deadLetterPolicy);
+        if (deadLetterPolicy != null) {
+            if (conf.getAckTimeoutMillis() == 0) {
+                conf.setAckTimeoutMillis(DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER);
+            }
+            conf.setDeadLetterPolicy(deadLetterPolicy);
+        }
         return this;
     }
 


Mime
View raw message