pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [incubator-pulsar] branch branch-2.1 updated: Fixed cpp multi-topic consumer when topics are not partitioned (#2453)
Date Tue, 28 Aug 2018 19:41:53 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5d7760a  Fixed cpp multi-topic consumer when topics are not partitioned (#2453)
5d7760a is described below

commit 5d7760a663a101396c25f105a4ae7a01bd77d533
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Mon Aug 27 23:28:55 2018 -0700

    Fixed cpp multi-topic consumer when topics are not partitioned (#2453)
    
    * Fixed cpp multi-topic consumer when topics are not partitioned
    
    * Fixed formatting
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 27 ++++++++++++++++++------
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 25 +++++++++++++++++-----
 2 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 6750273..69c3cc0 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -175,17 +175,30 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result
result,
     boost::shared_ptr<std::atomic<int>> partitionsNeedCreate =
         boost::make_shared<std::atomic<int>>(numPartitions);
 
-    for (int i = 0; i < numPartitions; i++) {
-        std::string topicPartitionName = topicName->getTopicPartitionName(i);
-        consumer = boost::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_,
config,
-                                                    internalListenerExecutor, Partitioned);
+    if (numPartitions == 1) {
+        // We don't have to add partition-n suffix
+        consumer = boost::make_shared<ConsumerImpl>(client_, topicName->toString(),
subscriptionName_, config,
+                                                    internalListenerExecutor, NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(
             boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
_1, _2,
                         partitionsNeedCreate, topicSubResultPromise));
-        consumer->setPartitionIndex(i);
-        consumers_.insert(std::make_pair(topicPartitionName, consumer));
-        LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " <<
consumerStr_);
+        consumers_.insert(std::make_pair(topicName->toString(), consumer));
+        LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
         consumer->start();
+
+    } else {
+        for (int i = 0; i < numPartitions; i++) {
+            std::string topicPartitionName = topicName->getTopicPartitionName(i);
+            consumer = boost::make_shared<ConsumerImpl>(client_, topicPartitionName,
subscriptionName_,
+                                                        config, internalListenerExecutor,
Partitioned);
+            consumer->getConsumerCreatedFuture().addListener(
+                boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
_1, _2,
+                            partitionsNeedCreate, topicSubResultPromise));
+            consumer->setPartitionIndex(i);
+            consumers_.insert(std::make_pair(topicPartitionName, consumer));
+            LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " -
" << consumerStr_);
+            consumer->start();
+        }
     }
 }
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e87850c..73430fc 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1604,10 +1604,12 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
     std::string topicName1 = "persistent://prop/unit/ns/testMultiTopicsConsumer1";
     std::string topicName2 = "persistent://prop/unit/ns/testMultiTopicsConsumer2";
     std::string topicName3 = "persistent://prop/unit/ns/testMultiTopicsConsumer3";
+    std::string topicName4 = "persistent://prop/unit/ns/testMultiTopicsConsumer4";
 
     topicNames.push_back(topicName1);
     topicNames.push_back(topicName2);
     topicNames.push_back(topicName3);
+    topicNames.push_back(topicName4);
 
     // call admin api to make topics partitioned
     std::string url1 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions";
@@ -1631,7 +1633,11 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
     result = client.createProducer(topicName3, producer3);
     ASSERT_EQ(ResultOk, result);
 
-    LOG_INFO("created 3 producers");
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 4 producers");
 
     int messageNumber = 100;
     ConsumerConfiguration consConfig;
@@ -1644,7 +1650,7 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
     result = consumerFuture.get(consumer);
     ASSERT_EQ(ResultOk, result);
     ASSERT_EQ(consumer.getSubscriptionName(), subName);
-    LOG_INFO("created topics consumer on 3 topics");
+    LOG_INFO("created topics consumer on 4 topics");
 
     std::string msgContent = "msg-content";
     LOG_INFO("Publishing 100 messages by producer 1 synchronously");
@@ -1673,14 +1679,23 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
         ASSERT_EQ(ResultOk, producer3.send(msg));
     }
 
-    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
-    for (int i = 0; i < 3 * messageNumber; i++) {
+    msgContent = "msg-content4";
+    LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer4.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 400 messages by multiTopicsConsumer");
+    for (int i = 0; i < 4 * messageNumber; i++) {
         Message m;
         ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
         ASSERT_EQ(ResultOk, consumer.acknowledge(m));
     }
 
-    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+    LOG_INFO("Consumed and acked 400 messages by multiTopicsConsumer");
 
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
 


Mime
View raw message