pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Websocket dead letter topic (PIP22) (#2968)
Date Fri, 09 Nov 2018 19:11:05 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 34ab423  Websocket dead letter topic (PIP22) (#2968)
34ab423 is described below

commit 34ab4230c125b5b9f32f476fbbcfc9f952e5f6b5
Author: Christophe Bornet <cbornet@hotmail.com>
AuthorDate: Fri Nov 9 20:10:59 2018 +0100

    Websocket dead letter topic (PIP22) (#2968)
    
    * Add support for dead letter policy in the websocket proxy
    
    * Add doc for websocket dead letter topic params
---
 .../apache/pulsar/websocket/ConsumerHandler.java    | 21 +++++++++++++++++----
 site2/docs/client-libraries-websocket.md            |  2 ++
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 740a4e1..4736cbe 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -38,13 +38,13 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -92,13 +92,13 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         this.numMsgsAcked = new LongAdder();
 
         try {
+            // checkAuth() and getConsumerConfiguration() should be called after assigning
a value to this.subscription
+            this.subscription = extractSubscription(request);
             builder = (ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(service.getPulsarClient());
             this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
                     : builder.getConf().getReceiverQueueSize();
             this.subscriptionType = builder.getConf().getSubscriptionType();
 
-            // checkAuth() should be called after assigning a value to this.subscription
-            this.subscription = extractSubscription(request);
             if (!checkAuth(response)) {
                 return;
             }
@@ -191,7 +191,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
             int pending = pendingMessages.incrementAndGet();
             if (pending < maxPendingMessages) {
                 // Start next read in a separate thread to avoid recursion
-                service.getExecutor().execute(() -> receiveMessage());
+                service.getExecutor().execute(this::receiveMessage);
             }
         }).exceptionally(exception -> {
             if (exception.getCause() instanceof AlreadyClosedException) {
@@ -313,6 +313,19 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
             builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
         }
 
+        if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic"))
{
+            DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
+            if (queryParams.containsKey("maxRedeliverCount")) {
+                dlpBuilder.maxRedeliverCount(Integer.parseInt(queryParams.get("maxRedeliverCount")))
+                        .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription));
+            }
+
+            if (queryParams.containsKey("deadLetterTopic")) {
+                dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic"));
+            }
+            builder.deadLetterPolicy(dlpBuilder.build());
+        }
+
         return builder;
     }
 
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index 1b17ee7..ae062ef 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -142,6 +142,8 @@ Key | Type | Required? | Explanation
 `receiverQueueSize` | int | no | Size of the consumer receive queue (default: 1000)
 `consumerName` | string | no | Consumer name
 `priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-)
for the consumer
+`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-)
for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic)
feature.
+`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-)
for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic)
feature.
 
 ##### Receiving messages
 


Mime
View raw message