pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rdhaba...@apache.org
Subject [pulsar] 01/01: Revert "Remove broker mode to handle persistent/non-persistent topics separately (#3348)"
Date Thu, 28 Feb 2019 14:57:42 GMT
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch revert-3348-np_remove
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b92fc922ef2b6fd78aa857b6f65e1fd18fabdec
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Thu Feb 28 06:57:27 2019 -0800

    Revert "Remove broker mode to handle persistent/non-persistent topics separately (#3348)"
    
    This reverts commit 77d4c28d59604cb6ec589d3620ab1770e0523c87.
---
 conf/broker.conf                                   |   6 +
 conf/standalone.conf                               |   6 +
 deployment/terraform-ansible/templates/broker.conf |   6 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  12 ++
 .../broker/loadbalance/impl/LoadManagerShared.java |  34 ++++-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  43 ++++--
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  27 +++-
 .../pulsar/broker/service/BrokerService.java       |  16 +++
 .../loadbalance/ModularLoadManagerImplTest.java    |  24 +++-
 .../pulsar/client/api/NonPersistentTopicTest.java  | 151 +++++++++++++++++++++
 .../data/loadbalancer/LoadManagerReport.java       |   4 +
 .../policies/data/loadbalancer/LoadReport.java     |  18 +++
 .../data/loadbalancer/LocalBrokerData.java         |  20 +++
 site/_data/config/broker.yaml                      |   6 +
 .../explanations/ja/non-persistent-topics.md       |  15 +-
 .../latest/cookbooks/non-persistent-messaging.md   |   8 ++
 .../getting-started/ConceptsAndArchitecture.md     |   2 +-
 site2/docs/concepts-messaging.md                   |   2 +-
 site2/docs/cookbooks-non-persistent.md             |   7 +
 site2/docs/reference-configuration.md              |   2 +
 20 files changed, 384 insertions(+), 25 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 9d7c341..262c44c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -198,6 +198,12 @@ maxConcurrentNonPersistentMessagePerConnection=1000
 # Number of worker threads to serve non-persistent topic
 numWorkerThreadsForNonPersistentTopic=8
 
+# Enable broker to load persistent topics
+enablePersistentTopics=true
+
+# Enable broker to load non-persistent topics
+enableNonPersistentTopics=true
+
 # Enable to run bookie along with broker
 enableRunBookieTogether=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 4f634e9..a8e7181 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -158,6 +158,12 @@ maxConcurrentNonPersistentMessagePerConnection=1000
 # Number of worker threads to serve non-persistent topic
 numWorkerThreadsForNonPersistentTopic=8
 
+# Enable broker to load persistent topics
+enablePersistentTopics=true
+
+# Enable broker to load non-persistent topics
+enableNonPersistentTopics=true
+
 # Max number of producers allowed to connect to topic. Once this limit reaches, Broker will
reject new producers
 # until the number of connected producers decrease.
 # Using a value of 0, is disabling maxProducersPerTopic-limit check.
diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf
index 1db877c..22f74be 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -163,6 +163,12 @@ maxConcurrentNonPersistentMessagePerConnection=1000
 # Number of worker threads to serve non-persistent topic
 numWorkerThreadsForNonPersistentTopic=8
 
+# Enable broker to load persistent topics
+enablePersistentTopics=true
+
+# Enable broker to load non-persistent topics
+enableNonPersistentTopics=true
+
 # Enable to run bookie along with broker
 enableRunBookieTogether=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2be470b..cb36df4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -397,6 +397,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
 
     @FieldContext(
         category = CATEGORY_SERVER,
+        doc = "Enable broker to load persistent topics"
+    )
+    private boolean enablePersistentTopics = true;
+
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Enable broker to load non-persistent topics"
+    )
+    private boolean enableNonPersistentTopics = true;
+
+    @FieldContext(
+        category = CATEGORY_SERVER,
         doc = "Enable to run bookie along with broker"
     )
     private boolean enableRunBookieTogether = false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 04449cc..6a5575c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -96,7 +96,7 @@ public class LoadManagerShared {
     // The brokers are put into brokerCandidateCache.
     public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
             final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
-            final Set<String> availableBrokers) {
+            final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate)
{
         Set<String> primariesCache = localPrimariesCache.get();
         primariesCache.clear();
 
@@ -145,11 +145,27 @@ public class LoadManagerShared {
                     }
 
                 }
-            } else if (policies.isSharedBroker(brokerUrl.getHost())) {
-                secondaryCache.add(broker);
-                if (log.isDebugEnabled()) {
-                    log.debug("Added Shared Broker - [{}] as possible Candidates for namespace
- [{}]",
-                            brokerUrl.getHost(), namespace.toString());
+            } else {
+                // non-persistent topic can be assigned to only those brokers that enabled
for non-persistent topic
+                if (isNonPersistentTopic
+                        && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString))
{
+                    if (log.isDebugEnabled()) {
+                        log.debug("Filter broker- [{}] because it doesn't support non-persistent
namespace - [{}]",
+                                brokerUrl.getHost(), namespace.toString());
+                    }
+                } else if (!isNonPersistentTopic
+                        && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString))
{
+                    // persistent topic can be assigned to only brokers that enabled for
persistent-topic
+                    if (log.isDebugEnabled()) {
+                        log.debug("Filter broker- [{}] because broker only supports non-persistent
namespace - [{}]",
+                                brokerUrl.getHost(), namespace.toString());
+                    }
+                } else if (policies.isSharedBroker(brokerUrl.getHost())) {
+                    secondaryCache.add(broker);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Added Shared Broker - [{}] as possible Candidates for
namespace - [{}]",
+                                brokerUrl.getHost(), namespace.toString());
+                    }
                 }
             }
         }
@@ -500,6 +516,12 @@ public class LoadManagerShared {
         return true;
     }
 
+    public interface BrokerTopicLoadingPredicate {
+        boolean isEnablePersistentTopics(String brokerUrl);
+
+        boolean isEnableNonPersistentTopics(String brokerUrl);
+    }
+
     /**
      * It filters out brokers which owns topic higher than configured threshold at
      * {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index b426d1e..997d9bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -175,6 +176,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
     // ZooKeeper belonging to the pulsar service.
     private ZooKeeper zkClient;
 
+    // check if given broker can load persistent/non-persistent topic
+    private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
+
     private Map<String, String> brokerToFailureDomainMap;
 
     private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key,
content) -> jsonMapper()
@@ -194,6 +198,22 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = Maps.newHashMap();
+
+        this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
+            @Override
+            public boolean isEnablePersistentTopics(String brokerUrl) {
+                final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://",
""));
+                return brokerData != null && brokerData.getLocalData() != null
+                        && brokerData.getLocalData().isPersistentTopicsEnabled();
+            }
+
+            @Override
+            public boolean isEnableNonPersistentTopics(String brokerUrl) {
+                final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://",
""));
+                return brokerData != null && brokerData.getLocalData() != null
+                        && brokerData.getLocalData().isNonPersistentTopicsEnabled();
+            }
+        };
     }
 
     /**
@@ -248,6 +268,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         localData.setBrokerVersionString(pulsar.getBrokerVersion());
+        // configure broker-topic mode
+        lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
+        lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
+        localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
+        localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
+
 
         placementStrategy = ModularLoadManagerStrategy.create(conf);
         policies = new SimpleResourceAllocationPolicies(pulsar);
@@ -599,7 +625,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                 ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
                         .getBundle(namespace, bundle);
                 LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
-                        getAvailableBrokers());
+                        getAvailableBrokers(), brokerTopicLoadingPredicate);
                 return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
currentBroker, pulsar,
                         brokerToNamespaceToBundleRange, brokerCandidateCache);
             }
@@ -680,7 +706,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
                     key -> getBundleDataOrDefault(bundle));
             brokerCandidateCache.clear();
-            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
+                    brokerTopicLoadingPredicate);
 
             // filter brokers which owns topic higher than threshold
             LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
@@ -702,14 +729,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                 }
             } catch ( BrokerFilterException x ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
-                        getAvailableBrokers());
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
             }
 
             if ( brokerCandidateCache.isEmpty() ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
-                        getAvailableBrokers());
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
             }
 
             // Choose a broker among the potentially smaller filtered list, when possible
@@ -727,8 +754,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
             if (maxUsage > overloadThreshold) {
                 // All brokers that were in the filtered list were overloaded, so check if
there is a better broker
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
-                        getAvailableBrokers());
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
+                        brokerTopicLoadingPredicate);
                 broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData,
conf);
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 2ad67a1..9f9b949 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -181,6 +182,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
     private boolean forceLoadReportUpdate = false;
     private static final Deserializer<LoadReport> loadReportDeserializer = (key, content)
-> jsonMapper()
             .readValue(content, LoadReport.class);
+    // check if given broker can load persistent/non-persistent topic
+    private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
 
     // Perform initializations which may be done without a PulsarService.
     public SimpleLoadManagerImpl() {
@@ -197,6 +200,21 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         brokerCandidateCache = new HashSet<>();
         availableBrokersCache = new HashSet<>();
         brokerToNamespaceToBundleRange = new HashMap<>();
+        this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
+            @Override
+            public boolean isEnablePersistentTopics(String brokerUrl) {
+                ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
+                LoadReport loadReport = currentLoadReports.get(ru);
+                return loadReport != null && loadReport.isPersistentTopicsEnabled();
+            }
+
+            @Override
+            public boolean isEnableNonPersistentTopics(String brokerUrl) {
+                ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
+                LoadReport loadReport = currentLoadReports.get(ru);
+                return loadReport != null && loadReport.isNonPersistentTopicsEnabled();
+            }
+        };
     }
 
     @Override
@@ -209,6 +227,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
         this.policies = new SimpleResourceAllocationPolicies(pulsar);
         lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+        lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
+        lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
+
         loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache())
{
             @Override
             public LoadReport deserialize(String key, byte[] content) throws Exception {
@@ -898,8 +919,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             }
             brokerCandidateCache.clear();
             try {
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
-                        availableBrokersCache);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
availableBrokersCache,
+                        brokerTopicLoadingPredicate);
             } catch (Exception e) {
                 log.warn("Error when trying to apply policies: {}", e);
                 for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet())
{
@@ -1091,6 +1112,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             try {
                 LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                         pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+                loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
+                loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
                 loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
                         pulsar.getConfiguration().getWebServicePort().get()));
                 loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0f78c72..f48877d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -501,6 +501,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
         CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
 
+        if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Broker is unable to load non-persistent topic {}", topic);
+            }
+            topicFuture.completeExceptionally(
+                    new NotAllowedException("Broker is not unable to load non-persistent
topic"));
+            return topicFuture;
+        }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
         CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication();
@@ -583,6 +591,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         checkTopicNsOwnership(topic);
 
         final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
+        if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Broker is unable to load persistent topic {}", topic);
+            }
+            topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable
to load persistent topic"));
+            return topicFuture;
+        }
+
         final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
 
         if (topicLoadSemaphore.tryAcquire()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index d23c05e..0303fbf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
@@ -514,6 +515,17 @@ public class ModularLoadManagerImplTest {
         SimpleResourceAllocationPolicies simpleResourceAllocationPolicies = new SimpleResourceAllocationPolicies(
                 pulsar1);
         ServiceUnitId serviceUnit = LoadBalancerTestingUtils.makeBundles(nsFactory, tenant,
cluster, namespace, 1)[0];
+        BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate()
{
+            @Override
+            public boolean isEnablePersistentTopics(String brokerUrl) {
+                return true;
+            }
+
+            @Override
+            public boolean isEnableNonPersistentTopics(String brokerUrl) {
+                return true;
+            }
+        };
 
         // (1) now we have isolation policy : primary=broker1, secondary=broker2, minLimit=1
 
@@ -521,7 +533,7 @@ public class ModularLoadManagerImplTest {
         Set<String> brokerCandidateCache = Sets.newHashSet();
         Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address,
broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies,
brokerCandidateCache,
-                availableBrokers);
+                availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker1Address));
 
@@ -529,7 +541,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies,
brokerCandidateCache,
-                availableBrokers);
+                availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
 
@@ -537,7 +549,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies,
brokerCandidateCache,
-                availableBrokers);
+                availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 0);
 
         // (2) now we will have isolation policy : primary=broker1, secondary=broker2, minLimit=2
@@ -551,7 +563,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies,
brokerCandidateCache,
-                availableBrokers);
+                availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 2);
         assertTrue(brokerCandidateCache.contains(broker1Address));
         assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -560,7 +572,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies,
brokerCandidateCache,
-                availableBrokers);
+                availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
 
@@ -568,7 +580,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies,
brokerCandidateCache,
-                availableBrokers);
+                availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 0);
 
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index c436491..2e7a444 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -615,6 +615,157 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
     }
 
     /**
+     * verifies load manager assigns topic only if broker started in non-persistent mode
+     *
+     * <pre>
+     * 1. Start broker with disable non-persistent topic mode
+     * 2. Create namespace with non-persistency set
+     * 3. Create non-persistent topic
+     * 4. Load-manager should not be able to find broker
+     * 5. Create producer on that topic should fail
+     * </pre>
+     */
+    @Test(dataProvider = "loadManager")
+    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName)
throws Exception {
+
+        final String namespace = "my-property/my-ns";
+        final String topicName = "non-persistent://" + namespace + "/loadManager";
+        final String defaultLoadManagerName = conf.getLoadManagerClassName();
+        final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
+        try {
+            // start broker to not own non-persistent namespace and create non-persistent
namespace
+            stopBroker();
+            conf.setEnableNonPersistentTopics(false);
+            conf.setLoadManagerClassName(loadManagerName);
+            startBroker();
+
+            Field field = PulsarService.class.getDeclaredField("loadManager");
+            field.setAccessible(true);
+            @SuppressWarnings("unchecked")
+            AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>)
field.get(pulsar);
+            LoadManager manager = LoadManager.create(pulsar);
+            manager.start();
+            loadManagerRef.set(manager);
+
+            NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
+            LoadManager loadManager = pulsar.getLoadManager().get();
+            ResourceUnit broker = null;
+            try {
+                broker = loadManager.getLeastLoaded(fdqn).get();
+            } catch (Exception e) {
+                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find
broker)
+            }
+            assertNull(broker);
+
+            try {
+                Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1,
+                        TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+            assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        } finally {
+            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+            conf.setLoadManagerClassName(defaultLoadManagerName);
+        }
+
+    }
+
+    /**
+     * verifies: broker should reject non-persistent topic loading if broker is not enable
for non-persistent topic
+     *
+     * @param loadManagerName
+     * @throws Exception
+     */
+    @Test
+    public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {
+
+        final String namespace = "my-property/my-ns";
+        final String topicName = "non-persistent://" + namespace + "/persitentNamespace";
+
+        final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
+        try {
+            conf.setEnableNonPersistentTopics(false);
+            stopBroker();
+            startBroker();
+            try {
+                Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1,
+                        TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+
+            assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+        } finally {
+            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+        }
+    }
+
+    /**
+     * verifies that broker started with onlyNonPersistent mode doesn't own persistent-topic
+     *
+     * @param loadManagerName
+     * @throws Exception
+     */
+    @Test(dataProvider = "loadManager")
+    public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName)
throws Exception {
+
+        final String namespace = "my-property/my-ns";
+        final String topicName = "persistent://" + namespace + "/loadManager";
+        final String defaultLoadManagerName = conf.getLoadManagerClassName();
+        final boolean defaultEnablePersistentTopic = conf.isEnablePersistentTopics();
+        final boolean defaultEnableNonPersistentTopic = conf.isEnableNonPersistentTopics();
+        try {
+            // start broker to not own non-persistent namespace and create non-persistent
namespace
+            stopBroker();
+            conf.setEnableNonPersistentTopics(true);
+            conf.setEnablePersistentTopics(false);
+            conf.setLoadManagerClassName(loadManagerName);
+            startBroker();
+
+            Field field = PulsarService.class.getDeclaredField("loadManager");
+            field.setAccessible(true);
+            @SuppressWarnings("unchecked")
+            AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>)
field.get(pulsar);
+            LoadManager manager = LoadManager.create(pulsar);
+            manager.start();
+            loadManagerRef.set(manager);
+
+            NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
+            LoadManager loadManager = pulsar.getLoadManager().get();
+            ResourceUnit broker = null;
+            try {
+                broker = loadManager.getLeastLoaded(fdqn).get();
+            } catch (Exception e) {
+                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find
broker)
+            }
+            assertNull(broker);
+
+            try {
+                Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1,
+                        TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+
+            assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        } finally {
+            conf.setEnablePersistentTopics(defaultEnablePersistentTopic);
+            conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic);
+            conf.setLoadManagerClassName(defaultLoadManagerName);
+        }
+
+    }
+
+    /**
      * Verifies msg-drop stats
      *
      * @throws Exception
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
index 4027800..7fb173b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
@@ -61,4 +61,8 @@ public interface LoadManagerReport extends ServiceLookupData {
 
     public String getBrokerVersionString();
 
+    public boolean isPersistentTopicsEnabled();
+
+    public boolean isNonPersistentTopicsEnabled();
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
index c98e0ce..6ca7744 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
@@ -44,6 +44,8 @@ public class LoadReport implements LoadManagerReport {
     private final String webServiceUrlTls;
     private final String pulsarServiceUrl;
     private final String pulsarServiceUrlTls;
+    private boolean persistentTopicsEnabled = true;
+    private boolean nonPersistentTopicsEnabled = true;
 
     private boolean isUnderLoaded;
     private boolean isOverLoaded;
@@ -402,6 +404,22 @@ public class LoadReport implements LoadManagerReport {
         return pulsarServiceUrlTls;
     }
 
+    public boolean isPersistentTopicsEnabled() {
+        return persistentTopicsEnabled;
+    }
+
+    public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) {
+        this.persistentTopicsEnabled = persistentTopicsEnabled;
+    }
+
+    public boolean isNonPersistentTopicsEnabled() {
+        return nonPersistentTopicsEnabled;
+    }
+
+    public void setNonPersistentTopicsEnabled(boolean nonPersistentTopicsEnabled) {
+        this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled;
+    }
+
     @Override
     public ResourceUsage getCpu() {
         return systemResourceUsage != null ? systemResourceUsage.cpu : null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index de8f7ff..129cf5d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -38,6 +38,8 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport
{
     private final String webServiceUrlTls;
     private final String pulsarServiceUrl;
     private final String pulsarServiceUrlTls;
+    private boolean persistentTopicsEnabled=true;
+    private boolean nonPersistentTopicsEnabled=true;
 
     // Most recently available system resource usage.
     private ResourceUsage cpu;
@@ -402,6 +404,24 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport
{
     }
 
     @Override
+    public boolean isPersistentTopicsEnabled() {
+        return persistentTopicsEnabled;
+    }
+
+    public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) {
+        this.persistentTopicsEnabled = persistentTopicsEnabled;
+    }
+
+    @Override
+    public boolean isNonPersistentTopicsEnabled() {
+        return nonPersistentTopicsEnabled;
+    }
+
+    public void setNonPersistentTopicsEnabled(boolean nonPersistentTopicsEnabled) {
+        this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled;
+    }
+
+    @Override
     public Map<String, NamespaceBundleStats> getBundleStats() {
         return getLastStats();
     }
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index c919c32..b006e3e 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -18,6 +18,12 @@
 #
 
 configs:
+- name: enablePersistentTopics
+  default: 'true'
+  description: Whether persistent topics are enabled on the broker
+- name: enableNonPersistentTopics
+  default: 'true'
+  description: Whether non-persistent topics are enabled on the broker
 - name: functionsWorkerEnabled
   description: Whether the Pulsar Functions worker service is enabled in the broker
   default: 'false'
diff --git a/site/_includes/explanations/ja/non-persistent-topics.md b/site/_includes/explanations/ja/non-persistent-topics.md
index 1564755..768413f 100644
--- a/site/_includes/explanations/ja/non-persistent-topics.md
+++ b/site/_includes/explanations/ja/non-persistent-topics.md
@@ -63,4 +63,17 @@ PulsarClient client = PulsarClient.create("pulsar://localhost:6650");
 
 Producer producer = client.createProducer(
             "non-persistent://sample/standalone/ns1/my-topic");
-```
\ No newline at end of file
+```
+
+### Brokerの設定
+
+多くの場合、ノンパーシステントトピック提供するためだけにクラスタ内に専用のBrokerを設定する必要はほとんどないでしょう。
+
+Brokerが特定のタイプのトピックのみを所有できるようにするための設定は次の通りです:
+
+```
+# Brokerによるパーシステントトピックのロードを無効化
+enablePersistentTopics=false
+# Brokerによるノンパーシステントトピックのロードを有効化
+enableNonPersistentTopics=true
+```
diff --git a/site/docs/latest/cookbooks/non-persistent-messaging.md b/site/docs/latest/cookbooks/non-persistent-messaging.md
index a44c949..b1e6ed4 100644
--- a/site/docs/latest/cookbooks/non-persistent-messaging.md
+++ b/site/docs/latest/cookbooks/non-persistent-messaging.md
@@ -49,6 +49,14 @@ $ bin/pulsar-client produce non-persistent://public/default/example-np-topic
\
 
 {% include admonition.html type="success" content="For a more thorough guide to non-persistent
topics from an administrative perspective, see the [Non-persistent topics](../../admin-api/non-persistent-topics)
guide." %}
 
+## Enabling non-persistent topics {#enabling}
+
+In order to enable non-persistent topics in a Pulsar {% popover broker %}, the [`enableNonPersistentTopics`](../../reference/Configuration#broker-enableNonPersistentTopics)
must be set to `true`. This is the default, and so you won't need to take any action to enable
non-persistent messaging.
+
+{% include admonition.html type="info" title="Configuration for standalone mode" content="If
you're running Pulsar in standalone mode, the same configurable parameters are available but
in the [`standalone.conf`](../../reference/Configuration#standalone) configuration file."
%}
+
+If you'd like to enable *only* non-persistent topics in a broker, you can set the [`enablePersistentTopics`](../../reference/Configuration#broker-enablePersistentTopics)
parameter to `false` and the `enableNonPersistentTopics` parameter to `true`.
+
 ## Managing non-persistent topics via the CLI {#cli}
 
 Non-persistent topics can be managed using the [`pulsar-admin non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent)
command-line interface. With that interface you can perform actions like [create a partitioned
non-persistent topic](../../reference/CliTools#pulsar-admin-non-persistent-create-partitioned-topic),
get [stats](../../reference/CliTools#pulsar-admin-non-persistent-stats) for a non-persistent
topic, [list](../../) non-persistent topics under a namespace, and more.
diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 607ae95..961141a 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -216,7 +216,7 @@ In non-persistent topics, {% popover brokers %} immediately deliver messages
to
 
 {% include admonition.html type="danger" content="With non-persistent topics, message data
lives only in memory. If a message broker fails or message data can otherwise not be retrieved
from memory, your message data may be lost. Use non-persistent topics only if you're *certain*
that your use case requires it and can sustain it." %}
 
-You can manage non-persistent topics using the [`pulsar-admin non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent)
interface.
+By default, non-persistent topics are enabled on Pulsar {% popover brokers %}. You can disable
them in the broker's [configuration](../../reference/Configuration#broker-enableNonPersistentTopics).
You can manage non-persistent topics using the [`pulsar-admin non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent)
interface.
 
 #### Performance
 
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index a0fee5c..b5ed064 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -248,7 +248,7 @@ In non-persistent topics, brokers immediately deliver messages to all
connected
 
 > With non-persistent topics, message data lives only in memory. If a message broker fails
or message data can otherwise not be retrieved from memory, your message data may be lost.
Use non-persistent topics only if you're *certain* that your use case requires it and can
sustain it.
 
-You can manage non-persistent topics using the [`pulsar-admin topics`](referencereference--pulsar-admin/#topics-1)
interface.
+By default, non-persistent topics are enabled on Pulsar brokers. You can disable them in
the broker's [configuration](reference-configuration.md#broker-enableNonPersistentTopics).
You can manage non-persistent topics using the [`pulsar-admin topics`](referencereference--pulsar-admin/#topics-1)
interface.
 
 ### Performance
 
diff --git a/site2/docs/cookbooks-non-persistent.md b/site2/docs/cookbooks-non-persistent.md
index 2c6047a..481f002 100644
--- a/site2/docs/cookbooks-non-persistent.md
+++ b/site2/docs/cookbooks-non-persistent.md
@@ -38,9 +38,16 @@ $ bin/pulsar-client produce non-persistent://public/default/example-np-topic
\
 
 > For a more thorough guide to non-persistent topics from an administrative perspective,
see the [Non-persistent topics](admin-api-non-persistent-topics.md) guide.
 
+## Enabling
+
+In order to enable non-persistent topics in a Pulsar broker, the [`enableNonPersistentTopics`](reference-configuration.md#broker-enableNonPersistentTopics)
must be set to `true`. This is the default, and so you won't need to take any action to enable
non-persistent messaging.
+
+
 > #### Configuration for standalone mode
 > If you're running Pulsar in standalone mode, the same configurable parameters are available
but in the [`standalone.conf`](reference-configuration.md#standalone) configuration file.

 
+If you'd like to enable *only* non-persistent topics in a broker, you can set the [`enablePersistentTopics`](reference-configuration.md#broker-enablePersistentTopics)
parameter to `false` and the `enableNonPersistentTopics` parameter to `true`.
+
 ## Managing with cli
 
 Non-persistent topics can be managed using the [`pulsar-admin non-persistent`](reference-pulsar-admin.md#non-persistent)
command-line interface. With that interface you can perform actions like [create a partitioned
non-persistent topic](reference-pulsar-admin.md#non-persistent-create-partitioned-topic),
get [stats](reference-pulsar-admin.md#non-persistent-stats) for a non-persistent topic, [list](reference-pulsar-admin.md)
non-persistent topics under a namespace, and more.
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 5f92d6f..08354bd 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -104,6 +104,8 @@ Pulsar brokers are responsible for handling incoming messages from producers,
di
 
 |Name|Description|Default|
 |---|---|---|
+|enablePersistentTopics|  Whether persistent topics are enabled on the broker |true|
+|enableNonPersistentTopics| Whether non-persistent topics are enabled on the broker |true|
 |functionsWorkerEnabled|  Whether the Pulsar Functions worker service is enabled in the broker
 |false|
 |zookeeperServers|  Zookeeper quorum connection string  ||
 |configurationStoreServers| Configuration store connection string (as a comma-separated list)
||


Mime
View raw message