pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] 02/02: [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311)
Date Mon, 27 Aug 2018 18:15:52 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit bcb3d3f6d15995b74c51d0507fece569361928da
Author: Jia Zhai <jiazhai@users.noreply.github.com>
AuthorDate: Tue Aug 14 06:05:59 2018 +0800

    [cpp] receiver queue size config acorss partitions in multi-topics-consumer (#2311)
    
    * catch up receiver queue size support in multi topics consumer
    
    * add python config
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  5 ++++-
 pulsar-client-cpp/python/pulsar/__init__.py        | 13 ++++++++++++
 pulsar-client-cpp/python/src/config.cc             |  4 ++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 23 +++++++++++++---------
 4 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 7be197c..6750273 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -160,9 +160,12 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(
         boost::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), _1,
_2));
-    config.setReceiverQueueSize(conf_.getReceiverQueueSize());
 
     int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions()
: 1;
+    // Apply total limit of receiver queue size across partitions
+    config.setReceiverQueueSize(
+        std::min(conf_.getReceiverQueueSize(),
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions)));
 
     Lock lock(mutex_);
     topicsPartitions_.insert(std::make_pair(topicName->toString(), numPartitions));
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 434fb07..f3b560b 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -315,6 +315,7 @@ class Client:
                         send_timeout_millis=30000,
                         compression_type=CompressionType.NONE,
                         max_pending_messages=1000,
+                        max_pending_messages_across_partitions=50000,
                         block_if_queue_full=False,
                         batching_enabled=False,
                         batching_max_messages=1000,
@@ -352,6 +353,9 @@ class Client:
         * `max_pending_messages`:
           Set the max size of the queue holding the messages pending to receive
           an acknowledgment from the broker.
+        * `max_pending_messages_across_partitions`:
+          Set the max size of the queue holding the messages pending to receive
+          an acknowledgment across partitions from the broker.
         * `block_if_queue_full`: Set whether `send_async` operations should
           block when the outgoing message queue is full.
         * `message_routing_mode`:
@@ -364,6 +368,7 @@ class Client:
         _check_type(int, send_timeout_millis, 'send_timeout_millis')
         _check_type(CompressionType, compression_type, 'compression_type')
         _check_type(int, max_pending_messages, 'max_pending_messages')
+        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
         _check_type(bool, block_if_queue_full, 'block_if_queue_full')
         _check_type(bool, batching_enabled, 'batching_enabled')
         _check_type(int, batching_max_messages, 'batching_max_messages')
@@ -374,6 +379,7 @@ class Client:
         conf.send_timeout_millis(send_timeout_millis)
         conf.compression_type(compression_type)
         conf.max_pending_messages(max_pending_messages)
+        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
         conf.block_if_queue_full(block_if_queue_full)
         conf.batching_enabled(batching_enabled)
         conf.batching_max_messages(batching_max_messages)
@@ -392,6 +398,7 @@ class Client:
                   consumer_type=ConsumerType.Exclusive,
                   message_listener=None,
                   receiver_queue_size=1000,
+                  max_total_receiver_queue_size_across_partitions=50000,
                   consumer_name=None,
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
@@ -434,6 +441,9 @@ class Client:
           should not be interrupted when the consumer queue size is zero. The
           default value is 1000 messages and should work well for most use
           cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for individual partitions
         * `consumer_name`:
           Sets the consumer name.
         * `unacked_messages_timeout_ms`:
@@ -450,6 +460,8 @@ class Client:
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
         _check_type_or_none(str, consumer_name, 'consumer_name')
         _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
@@ -461,6 +473,7 @@ class Client:
         if message_listener:
             conf.message_listener(message_listener)
         conf.receiver_queue_size(receiver_queue_size)
+        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
         if consumer_name:
             conf.consumer_name(consumer_name)
         if unacked_messages_timeout_ms:
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 9149add..9deee9a 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -108,6 +108,8 @@ void export_config() {
             .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>())
             .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages)
             .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages,
return_self<>())
+            .def("max_pending_messages_across_partitions", &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
+            .def("max_pending_messages_across_partitions", &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions,
return_self<>())
             .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull)
             .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull,
return_self<>())
             .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
@@ -128,6 +130,8 @@ void export_config() {
             .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>())
             .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize)
             .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize)
+            .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
+            .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
             .def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy<copy_const_reference>())
             .def("consumer_name", &ConsumerConfiguration::setConsumerName)
             .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4a0c449..bbfb3f3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -64,7 +64,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
     // Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic
name
     private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
 
-    // Map <topic, partitionNumber>, store partition number for each topic
+    // Map <topic, numPartitions>, store partition number for each topic
     protected final ConcurrentHashMap<String, Integer> topics;
 
     // Queue of partition consumers on which we have stopped calling receiveAsync() because
the
@@ -670,24 +670,29 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
         return subscribeResult;
     }
 
-    private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult,
String topicName, int partitionNumber) {
+    private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult,
String topicName, int numPartitions) {
         if (log.isDebugEnabled()) {
-            log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, partitionNumber);
+            log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
         }
 
         List<CompletableFuture<Consumer<T>>> futureList;
 
-        if (partitionNumber > 1) {
-            this.topics.putIfAbsent(topicName, partitionNumber);
-            allTopicPartitionsNumber.addAndGet(partitionNumber);
+        if (numPartitions > 1) {
+            this.topics.putIfAbsent(topicName, numPartitions);
+            allTopicPartitionsNumber.addAndGet(numPartitions);
+
+            int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
+            ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
+            configurationData.setReceiverQueueSize(receiverQueueSize);
 
             futureList = IntStream
-                .range(0, partitionNumber)
+                .range(0, numPartitions)
                 .mapToObj(
                     partitionIndex -> {
                         String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
-                        ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client,
partitionName, internalConfig,
+                        ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client,
partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(), partitionIndex,
subFuture, schema);
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         return subFuture;
@@ -732,7 +737,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
 
                     subscribeResult.complete(null);
                     log.info("[{}] [{}] Success subscribe new topic {} in topics consumer,
partitions: {}, allTopicPartitionsNumber: {}",
-                        topic, subscription, topicName, partitionNumber, allTopicPartitionsNumber.get());
+                        topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get());
                     if (this.namespaceName == null) {
                         this.namespaceName = TopicName.get(topicName).getNamespaceObject();
                     }


Mime
View raw message