pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [pulsar-broker] Avoid creating unnecessary dispatch-rate-limit objects (#3479)
Date Mon, 04 Feb 2019 16:52:23 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cca8ce  [pulsar-broker] Avoid creating unnecessary dispatch-rate-limit objects (#3479)
2cca8ce is described below

commit 2cca8ce83667fccde428db536a1446eef2203f8e
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Mon Feb 4 08:52:18 2019 -0800

    [pulsar-broker] Avoid creating unnecessary dispatch-rate-limit objects (#3479)
    
    * Avoid creating unnecessary dispatch-rate-limit objects
    
    * remove isntanceof check
---
 .../pulsar/broker/service/BrokerService.java       | 16 ++----
 .../apache/pulsar/broker/service/Dispatcher.java   | 10 +++-
 .../org/apache/pulsar/broker/service/Topic.java    | 12 ++++-
 .../service/persistent/DispatchRateLimiter.java    | 46 +++++++++++++++--
 .../PersistentDispatcherMultipleConsumers.java     | 47 +++++++++---------
 .../PersistentDispatcherSingleActiveConsumer.java  | 52 ++++++++++----------
 .../broker/service/persistent/PersistentTopic.java | 57 ++++++++++++++++------
 .../service/persistent/SubscribeRateLimiter.java   | 41 +++++++++++-----
 .../client/api/MessageDispatchThrottlingTest.java  | 38 +++++++--------
 .../SubscriptionMessageDispatchThrottlingTest.java | 28 +++++------
 10 files changed, 218 insertions(+), 129 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ff2a8dc..47fb1e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1179,10 +1179,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             forEachTopic(topic -> {
-                if (topic instanceof PersistentTopic) {
-                    PersistentTopic persistentTopic = (PersistentTopic) topic;
-                    // it first checks namespace-policy rate and if not present then applies broker-config
-                    persistentTopic.getDispatchRateLimiter().updateDispatchRate();
+                if (topic.getDispatchRateLimiter().isPresent()) {
+                    topic.getDispatchRateLimiter().get().updateDispatchRate();
                 }
             });
         });
@@ -1193,13 +1191,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             // update message-rate for each topic subscription
             forEachTopic(topic -> {
                 topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
-                    if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
-                        ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
-                                .getDispatchRateLimiter().updateDispatchRate();
-                    } else if (persistentSubscription
-                            .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
-                        ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
-                                .getDispatchRateLimiter().updateDispatchRate();
+                    Dispatcher dispatcher = persistentSubscription.getDispatcher();
+                    if (dispatcher.getRateLimiter().isPresent()) {
+                        dispatcher.getRateLimiter().get().updateDispatchRate();
                     }
                 });
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 76bfdc3..f9271c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -19,11 +19,13 @@
 package org.apache.pulsar.broker.service;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 
 public interface Dispatcher {
@@ -73,7 +75,11 @@ public interface Dispatcher {
 
     RedeliveryTracker getRedeliveryTracker();
 
-    default DispatchRateLimiter getRateLimiter() {
-        return null;
+    default Optional<DispatchRateLimiter> getRateLimiter() {
+        return Optional.empty();
+    }
+
+    default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+        //No-op
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 5837898..4209af7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.broker.service;
 
-import io.netty.buffer.ByteBuf;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -29,8 +31,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosi
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -38,6 +40,8 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 
+import io.netty.buffer.ByteBuf;
+
 public interface Topic {
 
     interface PublishContext {
@@ -155,4 +159,8 @@ public interface Topic {
     CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema);
 
     CompletableFuture<Void> deleteForcefully();
+
+    default Optional<DispatchRateLimiter> getDispatchRateLimiter() {
+        return Optional.empty();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 36f81a0..a69c31f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -26,6 +26,8 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -104,7 +106,7 @@ public class DispatchRateLimiter {
      * default broker dispatch-throttling-rate
      */
     public void updateDispatchRate() {
-        DispatchRate dispatchRate = getPoliciesDispatchRate();
+        DispatchRate dispatchRate = getPoliciesDispatchRate(brokerService);
         if (dispatchRate == null) {
             if (subscriptionName == null) {
                 dispatchRate = new DispatchRate(brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
@@ -118,6 +120,30 @@ public class DispatchRateLimiter {
         log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate);
     }
 
+    
+    public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
+            String topicName, String subscriptionName) {
+        final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
+        policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
+        return isDispatchRateNeeded(serviceConfig, policies, topicName, subscriptionName);
+    }
+
+    public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
+            final Optional<Policies> policies, final String topicName, final String subscriptionName) {
+        DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, topicName,
+                subscriptionName);
+        if (dispatchRate == null) {
+            if (subscriptionName == null) {
+                return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
+                        || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0;
+            } else {
+                return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
+                        || serviceConfig.getDispatchThrottlingRatePerSubscribeInByte() > 0;
+            }
+        }
+        return true;
+    }
+    
     public void onPoliciesUpdate(Policies data) {
         String cluster = brokerService.pulsar().getConfiguration().getClusterName();
 
@@ -150,17 +176,27 @@ public class DispatchRateLimiter {
      *
      * @return
      */
-    public DispatchRate getPoliciesDispatchRate() {
-        final NamespaceName namespace = TopicName.get(this.topicName).getNamespaceObject();
+    public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
         final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
+        final Optional<Policies> policies = getPolicies(brokerService, topicName);
+        return getPoliciesDispatchRate(cluster, policies, topicName, subscriptionName);
+    }
+    
+    
+    public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
+        final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
         final String path = path(POLICIES, namespace.toString());
         Optional<Policies> policies = Optional.empty();
         try {
             policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path)
                     .get(cacheTimeOutInSec, SECONDS);
         } catch (Exception e) {
-            log.warn("Failed to get message-rate for {} subscription {}", this.topicName, this.subscriptionName, e);
+            log.warn("Failed to get message-rate for {} ", topicName, e);
         }
+        return policies;
+    }
+
+    public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional<Policies> policies, final String topicName, final String subscriptionName) {
         // return policy-dispatch rate only if it's enabled in policies
         return policies.map(p -> {
             DispatchRate dispatchRate;
@@ -241,7 +277,7 @@ public class DispatchRateLimiter {
     }
 
 
-    private boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
+    private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
         return dispatchRate != null && (dispatchRate.dispatchThrottlingRateInMsg > 0
                 || dispatchRate.dispatchThrottlingRateInByte > 0);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 41b8ec7..0586544 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -89,7 +90,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
     private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
     private final ServiceConfiguration serviceConfig;
-    private DispatchRateLimiter dispatchRateLimiter;
+    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
     enum ReadType {
         Normal, Replay
@@ -105,7 +106,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
                 .getMaxUnackedMessagesPerSubscription();
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
-        this.dispatchRateLimiter = null;
+        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
     }
 
     @Override
@@ -237,8 +238,8 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
             // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
             if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter();
-                if (topicRateLimiter.isDispatchRateLimitingEnabled()) {
+                if (topic.getDispatchRateLimiter().isPresent() && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+                    DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
                     if (!topicRateLimiter.hasMessageDispatchPermit()) {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
@@ -257,14 +258,11 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
                     }
                 }
 
-                if (dispatchRateLimiter == null) {
-                    dispatchRateLimiter = new DispatchRateLimiter(topic, name);
-                }
-                if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
-                    if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
+                if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+                    if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", name,
-                                dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRateLimiter.getDispatchRateOnByte(),
+                                dispatchRateLimiter.get().getDispatchRateOnMsg(), dispatchRateLimiter.get().getDispatchRateOnByte(),
                                 MESSAGE_RATE_BACKOFF_MS);
                         }
                         topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
@@ -272,7 +270,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
                         return;
                     } else {
                         // if dispatch-rate is in msg then read only msg according to available permit
-                        long availablePermitsOnMsg = dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg();
+                        long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
                         if (availablePermitsOnMsg > 0) {
                             messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
                         }
@@ -446,12 +444,13 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
 
         // acquire message-dispatch permits for already delivered messages
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);
+            if (topic.getDispatchRateLimiter().isPresent()) {
+                topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
+            }
 
-            if (dispatchRateLimiter == null) {
-                dispatchRateLimiter = new DispatchRateLimiter(topic, name);
+            if (dispatchRateLimiter.isPresent()) {
+                dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
             }
-            dispatchRateLimiter.tryDispatchPermit(totalMessagesSent, totalBytesSent);
         }
 
         if (entriesToDispatch > 0) {
@@ -623,23 +622,23 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         return name;
     }
 
-    public DispatchRateLimiter getDispatchRateLimiter() {
-        if ((serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) &&
-            (dispatchRateLimiter == null)) {
-            dispatchRateLimiter = new DispatchRateLimiter(topic, name);
-        }
-        return dispatchRateLimiter;
-    }
-
     @Override
     public RedeliveryTracker getRedeliveryTracker() {
         return redeliveryTracker;
     }
 
     @Override
-    public DispatchRateLimiter getRateLimiter() {
+    public Optional<DispatchRateLimiter> getRateLimiter() {
         return dispatchRateLimiter;
     }
 
+    @Override
+    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+        if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
+                .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), name)) {
+            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, name));
+        }
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 3823473..5bc6f32 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -54,7 +55,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
     private final String name;
-    private DispatchRateLimiter dispatchRateLimiter;
+    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();;
 
     private volatile boolean havePendingRead = false;
 
@@ -75,8 +76,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         this.cursor = cursor;
         this.readBatchSize = MaxReadBatchSize;
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
-        this.dispatchRateLimiter = null;
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
+        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
     }
 
     protected void scheduleReadOnActiveConsumer() {
@@ -211,14 +212,15 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                 if (future.isSuccess()) {
                     // acquire message-dispatch permits for already delivered messages
                     if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                        topic.getDispatchRateLimiter().tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
-                                sentMsgInfo.getTotalSentMessageBytes());
+                        if (topic.getDispatchRateLimiter().isPresent()) {
+                            topic.getDispatchRateLimiter().get().tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
+                                    sentMsgInfo.getTotalSentMessageBytes());
+                        }
 
-                        if (dispatchRateLimiter == null) {
-                            dispatchRateLimiter = new DispatchRateLimiter(topic, name);
+                        if (dispatchRateLimiter.isPresent()) {
+                            dispatchRateLimiter.get().tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
+                                    sentMsgInfo.getTotalSentMessageBytes());
                         }
-                        dispatchRateLimiter.tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
-                                sentMsgInfo.getTotalSentMessageBytes());
                     }
 
                     // Schedule a new read batch operation only after the previous batch has been written to the socket
@@ -341,8 +343,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
             if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter();
-                if (topicRateLimiter.isDispatchRateLimitingEnabled()) {
+                if (topic.getDispatchRateLimiter().isPresent()
+                        && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+                    DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
                     if (!topicRateLimiter.hasMessageDispatchPermit()) {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
@@ -370,14 +373,11 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                     }
                 }
 
-                if (dispatchRateLimiter == null) {
-                    dispatchRateLimiter = new DispatchRateLimiter(topic, name);
-                }
-                if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
-                    if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
+                if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
+                    if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", name,
-                                dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRateLimiter.getDispatchRateOnByte(),
+                                dispatchRateLimiter.get().getDispatchRateOnMsg(), dispatchRateLimiter.get().getDispatchRateOnByte(),
                                 MESSAGE_RATE_BACKOFF_MS);
                         }
                         topic.getBrokerService().executor().schedule(() -> {
@@ -394,7 +394,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                         return;
                     } else {
                         // if dispatch-rate is in msg then read only msg according to available permit
-                        long subPermitsOnMsg = dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg();
+                        long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
                         if (subPermitsOnMsg > 0) {
                             messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg);
                         }
@@ -478,14 +478,6 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
 
     }
 
-    public DispatchRateLimiter getDispatchRateLimiter() {
-        if ((serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) &&
-            (dispatchRateLimiter == null)) {
-            dispatchRateLimiter = new DispatchRateLimiter(topic, name);
-        }
-        return dispatchRateLimiter;
-    }
-
     @Override
     public void addUnAckedMessages(int unAckMessages) {
         // No-op
@@ -497,9 +489,17 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     }
 
     @Override
-    public DispatchRateLimiter getRateLimiter() {
+    public Optional<DispatchRateLimiter> getRateLimiter() {
         return dispatchRateLimiter;
     }
 
+    @Override
+    public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+        if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
+                .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), name)) {
+            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, name));
+        }
+    }
+    
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 77bd3ec..b2236dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -167,8 +167,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
     // doesn't support batch-message
     private volatile boolean hasBatchMessagePublished = false;
-    private final DispatchRateLimiter dispatchRateLimiter;
-    private final SubscribeRateLimiter subscribeRateLimiter;
+    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+    private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
@@ -226,8 +226,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
         USAGE_COUNT_UPDATER.set(this, 0);
 
-        this.dispatchRateLimiter = new DispatchRateLimiter(this);
-        this.subscribeRateLimiter = new SubscribeRateLimiter(this);
+        initializeDispatchRateLimiterIfNeeded(Optional.empty());
 
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
 
@@ -269,6 +268,22 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
     }
 
+    private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
+        synchronized (dispatchRateLimiter) {
+            if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
+                    .isDispatchRateNeeded(brokerService, policies, topic, null)) {
+                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this));
+            }
+            if (!subscribeRateLimiter.isPresent() && SubscribeRateLimiter
+                    .isDispatchRateNeeded(brokerService, policies, topic)) {
+                this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+            }
+            subscriptions.forEach((name, subscription) -> {
+                subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies);
+            });
+        }
+    }
+
     private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) {
         checkNotNull(compactedTopic);
         if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
@@ -499,10 +514,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         if (cnx.getRemoteAddress() != null && cnx.getRemoteAddress().toString().contains(":")) {
             SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
                     cnx.getRemoteAddress().toString().split(":")[0], consumerName, consumerId);
-            if (!subscribeRateLimiter.subscribeAvailable(consumer) || !subscribeRateLimiter.tryAcquire(consumer)) {
+            if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer)) {
                 log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
-                        topic, subscriptionName, consumer, subscribeRateLimiter.getSubscribeRate(),
-                        subscribeRateLimiter.getAvailableSubscribeRateLimit(consumer));
+                        topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
+                        subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
                 future.completeExceptionally(new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
                 return future;
             }
@@ -853,8 +868,12 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 }
             }, null);
 
-            dispatchRateLimiter.close();
-            subscribeRateLimiter.close();
+            if (dispatchRateLimiter.isPresent()) {
+                dispatchRateLimiter.get().close();
+            }
+            if (subscribeRateLimiter.isPresent()) {
+                subscribeRateLimiter.get().close();
+            }
 
         }).exceptionally(exception -> {
             log.error("[{}] Error closing topic", topic, exception);
@@ -1582,22 +1601,30 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
                 data.schema_auto_update_compatibility_strategy);
 
+        initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
+        
         producers.forEach(producer -> {
             producer.checkPermissions();
             producer.checkEncryption();
         });
         subscriptions.forEach((subName, sub) -> {
             sub.getConsumers().forEach(Consumer::checkPermissions);
-            if (sub.getDispatcher().getRateLimiter() != null) {
-                sub.getDispatcher().getRateLimiter().onPoliciesUpdate(data);
+            if (sub.getDispatcher().getRateLimiter().isPresent()) {
+                sub.getDispatcher().getRateLimiter().get().onPoliciesUpdate(data);
             }
         });
         checkMessageExpiry();
         CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
         CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
         CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
-        dispatchRateLimiter.onPoliciesUpdate(data);
-        subscribeRateLimiter.onPoliciesUpdate(data);
+        // update rate-limiter if policies updated
+        if (this.dispatchRateLimiter.isPresent()) {
+            dispatchRateLimiter.get().onPoliciesUpdate(data);
+        }
+        if (this.subscribeRateLimiter.isPresent()) {
+            subscribeRateLimiter.get().onPoliciesUpdate(data);
+        }
+    
         return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
     }
 
@@ -1741,11 +1768,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         this.hasBatchMessagePublished = true;
     }
 
-    public DispatchRateLimiter getDispatchRateLimiter() {
+    public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
         return this.dispatchRateLimiter;
     }
 
-    public SubscribeRateLimiter getSubscribeRateLimiter() {
+    public Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
         return this.subscribeRateLimiter;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index d6c65d6..078abb5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.service.persistent;
 
 
 import com.google.common.base.MoreObjects;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -173,16 +175,34 @@ public class SubscribeRateLimiter {
      * @return
      */
     public SubscribeRate getPoliciesSubscribeRate() {
-        final NamespaceName namespace = TopicName.get(this.topicName).getNamespaceObject();
-        final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-        final String path = path(POLICIES, namespace.toString());
-        Optional<Policies> policies = Optional.empty();
-        try {
-            policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path)
-                    .get(cacheTimeOutInSec, SECONDS);
-        } catch (Exception e) {
-            log.warn("Failed to get subscribe-rate for {}", this.topicName, e);
+        return getPoliciesSubscribeRate(brokerService, topicName);
+    }
+
+    public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
+            String topicName) {
+        ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
+        policies = policies.isPresent() ? policies : DispatchRateLimiter.getPolicies(brokerService, topicName);
+        return isDispatchRateNeeded(serviceConfig, policies, topicName);
+    }
+
+    private static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
+            final Optional<Policies> policies, final String topicName) {
+        SubscribeRate subscribeRate = getPoliciesSubscribeRate(serviceConfig.getClusterName(), policies, topicName);
+        if (subscribeRate == null) {
+            return serviceConfig.getSubscribeThrottlingRatePerConsumer() > 0
+                    || serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
         }
+        return true;
+    }
+
+    public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) {
+        final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
+        final Optional<Policies> policies = DispatchRateLimiter.getPolicies(brokerService, topicName);
+        return getPoliciesSubscribeRate(cluster, policies, topicName);
+    }
+
+    public static SubscribeRate getPoliciesSubscribeRate(final String cluster, final Optional<Policies> policies,
+            String topicName) {
         // return policy-subscribe rate only if it's enabled in policies
         return policies.map(p -> {
             if (p.clusterSubscribeRate != null) {
@@ -191,7 +211,6 @@ public class SubscribeRateLimiter {
             } else {
                 return null;
             }
-
         }).orElse(null);
     }
 
@@ -204,7 +223,7 @@ public class SubscribeRateLimiter {
         return subscribeRateLimiter.get(consumerIdentifier) != null ? subscribeRateLimiter.get(consumerIdentifier).getRate() : -1;
     }
 
-    private boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
+    private static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
         return subscribeRate != null && (subscribeRate.subscribeThrottlingRatePerConsumer > 0);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 72f5147..cc0f46e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -110,7 +110,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
         // (1) verify message-rate is -1 initially
-        Assert.assertEquals(topic.getDispatchRateLimiter().getDispatchRateOnMsg(), -1);
+        Assert.assertFalse(topic.getDispatchRateLimiter().isPresent());
 
         // (1) change to 100
         int messageRate = 100;
@@ -119,7 +119,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isDispatchRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0) {
+            if (topic.getDispatchRateLimiter().isPresent()) {
                 isDispatchRateUpdate = true;
                 break;
             } else {
@@ -137,7 +137,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         isDispatchRateUpdate = false;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnByte() == messageRate) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() == messageRate) {
                 isDispatchRateUpdate = true;
                 break;
             } else {
@@ -182,8 +182,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isMessageRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0
-                    || topic.getDispatchRateLimiter().getDispatchRateOnByte() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0
+                    || topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0) {
                 isMessageRateUpdate = true;
                 break;
             } else {
@@ -318,7 +318,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isMessageRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
                 isMessageRateUpdate = true;
                 break;
             } else {
@@ -389,7 +389,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isMessageRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnByte() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0) {
                 isMessageRateUpdate = true;
                 break;
             } else {
@@ -452,7 +452,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isMessageRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
                 isMessageRateUpdate = true;
                 break;
             } else {
@@ -585,8 +585,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isMessageRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0
-                    && topic.getDispatchRateLimiter().getDispatchRateOnByte() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0
+                    && topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0) {
                 isMessageRateUpdate = true;
                 break;
             } else {
@@ -662,8 +662,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isMessageRateUpdate = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0
-                    || topic.getDispatchRateLimiter().getDispatchRateOnByte() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0
+                    || topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0) {
                 isMessageRateUpdate = true;
                 break;
             } else {
@@ -731,7 +731,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         boolean isUpdated = false;
         int retry = 5;
         for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() > 0) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
                 isUpdated = true;
                 break;
             } else {
@@ -813,33 +813,33 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName1).get();
 
         // (1) Update dispatch rate on cluster-config update
-        Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg());
+        Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().get().getDispatchRateOnMsg());
 
         // (2) Update namespace throttling limit
         int nsMessageRate = 500;
         DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0, 1);
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         for (int i = 0; i < 5; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() != nsMessageRate) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() != nsMessageRate) {
                 Thread.sleep(50 + (i * 10));
             }
         }
-        Assert.assertEquals(nsMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg());
+        Assert.assertEquals(nsMessageRate, topic.getDispatchRateLimiter().get().getDispatchRateOnMsg());
 
         // (3) Disable namespace throttling limit will force to take cluster-config
         dispatchRate = new DispatchRate(0, 0, 1);
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
         for (int i = 0; i < 5; i++) {
-            if (topic.getDispatchRateLimiter().getDispatchRateOnMsg() == nsMessageRate) {
+            if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() == nsMessageRate) {
                 Thread.sleep(50 + (i * 10));
             }
         }
-        Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg());
+        Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().get().getDispatchRateOnMsg());
 
         // (5) Namespace throttling is disabled so, new topic should take cluster throttling limit
         Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();
         PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName2).get();
-        Assert.assertEquals(clusterMessageRate, topic2.getDispatchRateLimiter().getDispatchRateOnMsg());
+        Assert.assertEquals(clusterMessageRate, topic2.getDispatchRateLimiter().get().getDispatchRateOnMsg());
 
         producer.close();
         producer2.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 5e3e0d2..cbcef08 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -82,9 +82,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         DispatchRateLimiter subRateLimiter = null;
         Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
@@ -165,9 +165,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         DispatchRateLimiter subRateLimiter = null;
         Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
@@ -251,9 +251,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         DispatchRateLimiter subRateLimiter = null;
         Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
@@ -334,9 +334,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         DispatchRateLimiter subRateLimiter = null;
         Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
@@ -488,9 +488,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         DispatchRateLimiter subRateLimiter = null;
         Dispatcher subDispatcher = topic.getSubscription(subName1).getDispatcher();
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
@@ -504,9 +504,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
 
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }
@@ -536,9 +536,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
 
         subDispatcher = topic2.getSubscription(subName2).getDispatcher();
         if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
-            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getRateLimiter().get();
         } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
-            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getRateLimiter().get();
         } else {
             Assert.fail("Should only have PersistentDispatcher in this test");
         }


Mime
View raw message