From commits-return-13545-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Tue Aug 28 21:41:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 342DB180621 for ; Tue, 28 Aug 2018 21:41:55 +0200 (CEST) Received: (qmail 90953 invoked by uid 500); 28 Aug 2018 19:41:54 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 90944 invoked by uid 99); 28 Aug 2018 19:41:54 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Aug 2018 19:41:54 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B942380810; Tue, 28 Aug 2018 19:41:53 +0000 (UTC) Date: Tue, 28 Aug 2018 19:41:53 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch branch-2.1 updated: Fixed cpp multi-topic consumer when topics are not partitioned (#2453) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153548531359.17405.16824945308368226970@gitbox.apache.org> From: mmerli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/branch-2.1 X-Git-Reftype: branch X-Git-Oldrev: 333b896a37ccd27729efd778aa97710b7f8286db X-Git-Newrev: 5d7760a663a101396c25f105a4ae7a01bd77d533 X-Git-Rev: 5d7760a663a101396c25f105a4ae7a01bd77d533 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 5d7760a Fixed cpp multi-topic consumer when topics are not partitioned (#2453) 5d7760a is described below commit 5d7760a663a101396c25f105a4ae7a01bd77d533 Author: Matteo Merli AuthorDate: Mon Aug 27 23:28:55 2018 -0700 Fixed cpp multi-topic consumer when topics are not partitioned (#2453) * Fixed cpp multi-topic consumer when topics are not partitioned * Fixed formatting --- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 27 ++++++++++++++++++------ pulsar-client-cpp/tests/BasicEndToEndTest.cc | 25 +++++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 6750273..69c3cc0 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -175,17 +175,30 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, boost::shared_ptr> partitionsNeedCreate = boost::make_shared>(numPartitions); - for (int i = 0; i < numPartitions; i++) { - std::string topicPartitionName = topicName->getTopicPartitionName(i); - consumer = boost::make_shared(client_, topicPartitionName, subscriptionName_, config, - internalListenerExecutor, Partitioned); + if (numPartitions == 1) { + // We don't have to add partition-n suffix + consumer = boost::make_shared(client_, topicName->toString(), subscriptionName_, config, + internalListenerExecutor, NonPartitioned); consumer->getConsumerCreatedFuture().addListener( boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), _1, _2, partitionsNeedCreate, topicSubResultPromise)); - consumer->setPartitionIndex(i); - consumers_.insert(std::make_pair(topicPartitionName, consumer)); - LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " << consumerStr_); + consumers_.insert(std::make_pair(topicName->toString(), consumer)); + LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_); consumer->start(); + + } else { + for (int i = 0; i < numPartitions; i++) { + std::string topicPartitionName = topicName->getTopicPartitionName(i); + consumer = boost::make_shared(client_, topicPartitionName, subscriptionName_, + config, internalListenerExecutor, Partitioned); + consumer->getConsumerCreatedFuture().addListener( + boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), _1, _2, + partitionsNeedCreate, topicSubResultPromise)); + consumer->setPartitionIndex(i); + consumers_.insert(std::make_pair(topicPartitionName, consumer)); + LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_); + consumer->start(); + } } } diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index e87850c..73430fc 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -1604,10 +1604,12 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { std::string topicName1 = "persistent://prop/unit/ns/testMultiTopicsConsumer1"; std::string topicName2 = "persistent://prop/unit/ns/testMultiTopicsConsumer2"; std::string topicName3 = "persistent://prop/unit/ns/testMultiTopicsConsumer3"; + std::string topicName4 = "persistent://prop/unit/ns/testMultiTopicsConsumer4"; topicNames.push_back(topicName1); topicNames.push_back(topicName2); topicNames.push_back(topicName3); + topicNames.push_back(topicName4); // call admin api to make topics partitioned std::string url1 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions"; @@ -1631,7 +1633,11 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { result = client.createProducer(topicName3, producer3); ASSERT_EQ(ResultOk, result); - LOG_INFO("created 3 producers"); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + + LOG_INFO("created 4 producers"); int messageNumber = 100; ConsumerConfiguration consConfig; @@ -1644,7 +1650,7 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { result = consumerFuture.get(consumer); ASSERT_EQ(ResultOk, result); ASSERT_EQ(consumer.getSubscriptionName(), subName); - LOG_INFO("created topics consumer on 3 topics"); + LOG_INFO("created topics consumer on 4 topics"); std::string msgContent = "msg-content"; LOG_INFO("Publishing 100 messages by producer 1 synchronously"); @@ -1673,14 +1679,23 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { ASSERT_EQ(ResultOk, producer3.send(msg)); } - LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer"); - for (int i = 0; i < 3 * messageNumber; i++) { + msgContent = "msg-content4"; + LOG_INFO("Publishing 100 messages by producer 4 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer4.send(msg)); + } + + LOG_INFO("Consuming and acking 400 messages by multiTopicsConsumer"); + for (int i = 0; i < 4 * messageNumber; i++) { Message m; ASSERT_EQ(ResultOk, consumer.receive(m, 10000)); ASSERT_EQ(ResultOk, consumer.acknowledge(m)); } - LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer"); + LOG_INFO("Consumed and acked 400 messages by multiTopicsConsumer"); ASSERT_EQ(ResultOk, consumer.unsubscribe());