pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Issue #3803: Make ManagedLedger read batch size configurable (#3808)
Date Tue, 12 Mar 2019 23:17:38 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ce297c  Issue #3803: Make ManagedLedger read batch size configurable (#3808)
0ce297c is described below

commit 0ce297c5ffd6430b047728a779a0896d6b5fb7d9
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Wed Mar 13 07:17:32 2019 +0800

    Issue #3803: Make ManagedLedger read batch size configurable (#3808)
    
    *Motivation*
    
    Fixes #3803
    
    Hardcoding is a very bad practice. It means we have no way to alter system behavior
    when production issues occur.
    
    *Modifications*
    
    introduce a few read batch related settings to make them configurable
---
 conf/broker.conf                                   | 11 ++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 25 ++++++++++++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     | 15 ++++++-------
 .../PersistentDispatcherSingleActiveConsumer.java  |  9 ++++----
 .../service/persistent/PersistentReplicator.java   | 13 +++++------
 5 files changed, 54 insertions(+), 19 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 73e94cd..ba00cfa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -186,6 +186,17 @@ dispatchThrottlingRatePerSubscribeInByte=0
 # backlog.
 dispatchThrottlingOnNonBacklogConsumerEnabled=true
 
+# Max number of entries to read from bookkeeper. By default it is 100 entries.
+dispatcherMaxReadBatchSize=100
+
+# Min number of entries to read from bookkeeper. By default it is 1 entries.
+# When there is an error occurred on reading entries from bookkeeper, the broker
+# will backoff the batch size to this minimum number."
+dispatcherMinReadBatchSize=1
+
+# Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
+dispatcherMaxRoundRobinBatchSize=20
+
 # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup
traffic
 maxConcurrentLookupRequest=50000
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 39c3e42..0fbe155 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -377,6 +377,31 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + " published messages and don't have backlog. This enables dispatch-throttling
for "
             + " non-backlog consumers as well.")
     private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
+
+    // <-- dispatcher read settings -->
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "Max number of entries to read from bookkeeper. By default it is 100 entries."
+    )
+    private int dispatcherMaxReadBatchSize = 100;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "Min number of entries to read from bookkeeper. By default it is 1 entries."
+            + "When there is an error occurred on reading entries from bookkeeper, the broker"
+            + " will backoff the batch size to this minimum number."
+    )
+    private int dispatcherMinReadBatchSize = 1;
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "Max number of entries to dispatch for a shared subscription. By default it
is 20 entries."
+    )
+    private int dispatcherMaxRoundRobinBatchSize = 20;
+
     @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02c3ea8..066e2b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -65,9 +65,6 @@ import com.google.common.collect.Lists;
  */
 public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {
 
-    private static final int MaxReadBatchSize = 100;
-    private static final int MaxRoundRobinBatchSize = 20;
-
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
 
@@ -105,7 +102,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
                 ? new InMemoryRedeliveryTracker()
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
-        this.readBatchSize = MaxReadBatchSize;
+        this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
                 .getMaxUnackedMessagesPerSubscription();
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -386,8 +383,8 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
             havePendingReplayRead = false;
         }
 
-        if (readBatchSize < MaxReadBatchSize) {
-            int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
+        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Increasing read batch size from {} to {}", name, readBatchSize,
newReadBatchSize);
             }
@@ -423,7 +420,9 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
             }
 
             // round-robin dispatch batch size for this consumer
-            int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()),
MaxRoundRobinBatchSize);
+            int messagesForC = Math.min(
+                Math.min(entriesToDispatch, c.getAvailablePermits()),
+                serviceConfig.getDispatcherMaxRoundRobinBatchSize());
 
             if (messagesForC > 0) {
 
@@ -511,7 +510,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
             }
         }
 
-        readBatchSize = 1;
+        readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
 
         topic.getBrokerService().executor().schedule(() -> {
             synchronized (PersistentDispatcherMultipleConsumers.this) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 5bc6f32..4356fbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -59,7 +59,6 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
 
     private volatile boolean havePendingRead = false;
 
-    private static final int MaxReadBatchSize = 100;
     private int readBatchSize;
     private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES,
0, TimeUnit.MILLISECONDS);
     private final ServiceConfiguration serviceConfig;
@@ -74,8 +73,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
         this.cursor = cursor;
-        this.readBatchSize = MaxReadBatchSize;
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
+        this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
     }
@@ -182,8 +181,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
 
         havePendingRead = false;
 
-        if (readBatchSize < MaxReadBatchSize) {
-            int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
+        if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Increasing read batch size from {} to {}", name, readConsumer,
readBatchSize,
                         newReadBatchSize);
@@ -451,7 +450,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         checkNotNull(c);
 
         // Reduce read batch size to avoid flooding bookies with retries
-        readBatchSize = 1;
+        readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
 
         topic.getBrokerService().executor().schedule(() -> {
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 7d529f9..cda58e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -59,8 +59,6 @@ public class PersistentReplicator extends AbstractReplicator implements
Replicat
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
 
-
-    private static final int MaxReadBatchSize = 100;
     private int readBatchSize;
 
     private final int producerQueueThreshold;
@@ -98,7 +96,9 @@ public class PersistentReplicator extends AbstractReplicator implements
Replicat
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
 
-        readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize);
+        readBatchSize = Math.min(
+            producerQueueSize,
+            topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
         producerQueueThreshold = (int) (producerQueueSize * 0.9);
 
         startProducer();
@@ -189,8 +189,9 @@ public class PersistentReplicator extends AbstractReplicator implements
Replicat
                     entries.size());
         }
 
-        if (readBatchSize < MaxReadBatchSize) {
-            int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
+        int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
+        if (readBatchSize < maxReadBatchSize) {
+            int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize);
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName,
localCluster,
                         remoteCluster, readBatchSize, newReadBatchSize);
@@ -410,7 +411,7 @@ public class PersistentReplicator extends AbstractReplicator implements
Replicat
         }
 
         // Reduce read batch size to avoid flooding bookies with retries
-        readBatchSize = 1;
+        readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
 
         long waitTimeMillis = readFailureBackoff.next();
 


Mime
View raw message