pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: fix: ack timeout in pulsar cpp client when subscribing to regex topic (#3879)
Date Sun, 24 Mar 2019 16:16:51 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e3b12  fix: ack timeout in  pulsar cpp client when subscribing to regex topic (#3879)
16e3b12 is described below

commit 16e3b128b65a633188ea03f5a3438dc6fef16ac9
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Sun Mar 24 11:16:47 2019 -0500

    fix: ack timeout in  pulsar cpp client when subscribing to regex topic (#3879)
    
    * fix bug involving ack timeout in  pulsar cpp client when subscribing to regex topic
    
    * remove newline
    
    * fix indent
    
    * add test
    
    * addressing comments
    
    * fix formatting
    
    * fix formatting
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  1 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 50 +++++++++++++++++++++++-
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 2c01da5..c5b3240 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -450,6 +450,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const
Message&
         }
         messages_.push(msg);
         if (messageListener_) {
+            unAckedMessageTrackerPtr_->add(msg.getMessageId());
             listenerExecutor_->postWork(
                 std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(),
consumer));
         }
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 9ce89f7..1ea31b7 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -35,8 +35,9 @@
 #include <lib/PatternMultiTopicsConsumerImpl.h>
 #include "lib/Future.h"
 #include "lib/Utils.h"
-
 #include <functional>
+#include <thread>
+#include <chrono>
 
 DECLARE_LOG_OBJECT()
 
@@ -2940,3 +2941,50 @@ TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics)
{
     // consumer C should be a different instance from A and B and should be with open state.
     ASSERT_EQ(ResultOk, consumerC.close());
 }
+
+static long regexTestMessagesReceived = 0;
+
+static void regexMessageListenerFunction(Consumer consumer, const Message &msg) {
+    regexTestMessagesReceived++;
+}
+
+TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    long unAckedMessagesTimeoutMs = 10000;
+    std::string subsName = "testRegexTopicsWithMessageListener-sub";
+    std::string pattern = "persistent://public/default/testRegexTopicsWithMessageListenerTopic-.*";
+    ConsumerConfiguration consumerConf;
+    consumerConf.setConsumerType(ConsumerShared);
+    consumerConf.setMessageListener(
+        std::bind(regexMessageListenerFunction, std::placeholders::_1, std::placeholders::_2));
+    consumerConf.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    Result result = client.createProducer(
+        "persistent://public/default/testRegexTopicsWithMessageListenerTopic-1", producerConf,
producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribeWithRegex(pattern, subsName, consumerConf, consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subsName);
+
+    for (int i = 0; i < 10; i++) {
+        Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
+        producer.sendAsync(msg, nullptr);
+    }
+
+    producer.flush();
+    long timeWaited = 0;
+    while (true) {
+        // maximum wait time
+        ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 3);
+        if (regexTestMessagesReceived >= 10 * 2) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+        timeWaited += 500;
+    }
+}


Mime
View raw message