pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #913: Fail broker start if broker-znode created by other zk-session
Date Thu, 01 Jan 1970 00:00:00 GMT
rdhabalia closed pull request #913: Fail broker start if broker-znode created by other zk-session
URL: https://github.com/apache/incubator-pulsar/pull/913
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 4790de497..4cb985188 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -723,9 +723,15 @@ public void start() throws PulsarServerException {
             final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
             updateLocalBrokerData();
             try {
-                ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, localData.getJsonBytes(),
+                ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             } catch (KeeperException.NodeExistsException e) {
+                long ownerZkSessionId = getBrokerZnodeOwner();
+                if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId())
{
+                    log.error("Broker znode - [{}] is own by different zookeeper-ssession
{} ", brokerZnodePath,
+                            ownerZkSessionId);
+                    throw new PulsarServerException("Broker-znode owned by different zk-session
" + ownerZkSessionId);
+                }
                 // Node may already be created by another load manager: in this case update
the data.
                 zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
             } catch (Exception e) {
@@ -850,4 +856,15 @@ private void deleteBundleDataFromZookeeper(String bundle) {
             log.warn("Failed to delete bundle-data {} from zookeeper", bundle, e);
         }
     }
+
+    private long getBrokerZnodeOwner() {
+        try {
+            Stat stat = new Stat();
+            zkClient.getData(brokerZnodePath, false, stat);
+            return stat.getEphemeralOwner();
+        } catch (Exception e) {
+            log.warn("Failed to get stat of {}", brokerZnodePath, e);
+        }
+        return 0;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 8c01e9a3a..43aef2c62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -299,6 +299,12 @@ public void start() throws PulsarServerException {
                 ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
                         loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             } catch (KeeperException.NodeExistsException e) {
+                long ownerZkSessionId = getBrokerZnodeOwner();
+                if (ownerZkSessionId != 0 && ownerZkSessionId != pulsar.getZkClient().getSessionId())
{
+                    log.error("Broker znode - [{}] is own by different zookeeper-ssession
{} ", brokerZnodePath,
+                            ownerZkSessionId);
+                    throw new PulsarServerException("Broker-znode owned by different zk-session
" + ownerZkSessionId);
+                }
                 // Node may already be created by another load manager: in this case update
the data.
                 if (loadReport != null) {
                     pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8),
-1);
@@ -1470,4 +1476,15 @@ public void stop() throws PulsarServerException {
         availableActiveBrokers.close();
         scheduler.shutdown();
     }
+    
+    private long getBrokerZnodeOwner() {
+        try {
+            Stat stat = new Stat();
+            pulsar.getZkClient().getData(brokerZnodePath, false, stat);
+            return stat.getEphemeralOwner();
+        } catch (Exception e) {
+            log.warn("Failed to get stat of {}", brokerZnodePath, e);
+        }
+        return 0;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 8edc971f0..81c22e07f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -44,15 +44,15 @@
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
-import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
-import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
@@ -64,7 +64,6 @@
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -552,4 +551,34 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         assertEquals(brokerCandidateCache.size(), 0);
 
     }
+    
+    /**
+     * It verifies that pulsar-service fails if load-manager tries to create ephemeral znode
for broker which is already
+     * created by other zk-session-id.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testOwnBrokerZnodeByMultipleBroker() throws Exception {
+
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config.setClusterName("use");
+        int brokerWebServicePort = PortManager.nextFreePort();
+        int brokerServicePort = PortManager.nextFreePort();
+        config.setWebServicePort(brokerWebServicePort);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(brokerServicePort);
+        PulsarService pulsar = new PulsarService(config);
+        // create znode using different zk-session
+        final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress()
+ ":"
+                + brokerWebServicePort;
+        ZkUtils.createFullPathOptimistic(pulsar1.getZkClient(), brokerZnode, "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+        try {
+            pulsar.start();
+        } catch (PulsarServerException e) {
+            //Ok.
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message