pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Fixed intermittent test failures with "bind error" (#2725)
Date Fri, 05 Oct 2018 03:28:51 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79dd772  Fixed intermittent test failures with "bind error" (#2725)
79dd772 is described below

commit 79dd772f61bdb78816247bc3ad24a59cdc8f7ac6
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu Oct 4 20:28:44 2018 -0700

    Fixed intermittent test failures with "bind error" (#2725)
---
 .../apache/pulsar/broker/SLAMonitoringTest.java    |  2 +-
 .../AntiAffinityNamespaceGroupTest.java            |  2 +-
 .../broker/loadbalance/LoadBalancerTest.java       |  2 +-
 .../loadbalance/ModularLoadManagerImplTest.java    |  2 +-
 .../loadbalance/SimpleLoadManagerImplTest.java     |  2 +-
 .../broker/service/AdvertisedAddressTest.java      |  3 +-
 .../broker/service/BacklogQuotaManagerTest.java    |  9 ++---
 .../broker/service/BrokerBkEnsemblesTests.java     | 10 +++---
 .../pulsar/broker/service/ReplicatorTestBase.java  |  6 ++--
 .../broker/service/v1/V1_ReplicatorTestBase.java   |  6 ++--
 .../pulsar/client/api/NonPersistentTopicTest.java  |  6 ++--
 .../worker/PulsarWorkerAssignmentTest.java         |  2 +-
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |  8 ++---
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  2 +-
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |  2 +-
 pulsar-zookeeper-utils/pom.xml                     |  8 +++++
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  | 41 ++++++++++++++++++----
 .../zookeeper/LocalBookkeeperEnsembleTest.java     | 16 ++++-----
 18 files changed, 83 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index cda002b..233d6b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -72,7 +72,7 @@ public class SLAMonitoringTest {
     void setup() throws Exception {
         log.info("---- Initializing SLAMonitoringTest -----");
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         // start brokers
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 6c20c77..8088637 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -112,7 +112,7 @@ public class AntiAffinityNamespaceGroupTest {
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         // Start broker 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index d9e5b78..b90fa6f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -120,7 +120,7 @@ public class LoadBalancerTest {
     @BeforeMethod
     void setup() throws Exception {
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
         ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
                 SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 863833a..0303fbf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -146,7 +146,7 @@ public class ModularLoadManagerImplTest {
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         // Start broker 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 052c97e..54c51fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -127,7 +127,7 @@ public class SimpleLoadManagerImplTest {
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         // Start broker 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index dda76b1..3990168 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -44,7 +45,7 @@ public class AdvertisedAddressTest {
 
     @BeforeMethod
     public void setup() throws Exception {
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
         ServiceConfiguration config = new ServiceConfiguration();
         config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 0026559..97155ee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -53,7 +54,7 @@ import com.google.common.collect.Sets;
 /**
  */
 public class BacklogQuotaManagerTest {
-    protected static int BROKER_SERVICE_PORT = 16650;
+    protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
     PulsarService pulsar;
     ServiceConfiguration config;
 
@@ -62,15 +63,15 @@ public class BacklogQuotaManagerTest {
 
     LocalBookkeeperEnsemble bkEnsemble;
 
-    private final int ZOOKEEPER_PORT = 12759;
-    protected final int BROKER_WEBSERVICE_PORT = 15782;
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
     private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
 
     @BeforeMethod
     void setup() throws Exception {
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001);
+            bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
             bkEnsemble.start();
 
             // start pulsar service
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 94823ad..cc82ff4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -60,7 +61,7 @@ import org.testng.annotations.Test;
 /**
  */
 public class BrokerBkEnsemblesTests {
-    protected static int BROKER_SERVICE_PORT = 16650;
+    protected static int BROKER_SERVICE_PORT = PortManager.nextFreePort();
     protected PulsarService pulsar;
     ServiceConfiguration config;
 
@@ -69,10 +70,9 @@ public class BrokerBkEnsemblesTests {
 
     LocalBookkeeperEnsemble bkEnsemble;
 
-    private final int ZOOKEEPER_PORT = 12759;
-    protected final int BROKER_WEBSERVICE_PORT = 15782;
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    protected final int BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
 
-    protected final int bkBasePort = 5001;
     private final int numberOfBookies;
 
     public BrokerBkEnsemblesTests() {
@@ -87,7 +87,7 @@ public class BrokerBkEnsemblesTests {
     protected void setup() throws Exception {
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, 5001);
+            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, ZOOKEEPER_PORT, ()
-> PortManager.nextFreePort());
             bkEnsemble.start();
 
             // start pulsar service
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 2f599ee..10754b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -105,7 +105,7 @@ public class ReplicatorTestBase {
 
         // Start region 1
         int zkPort1 = PortManager.nextFreePort();
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
         bkEnsemble1.start();
 
         int webServicePort1 = PortManager.nextFreePort();
@@ -143,7 +143,7 @@ public class ReplicatorTestBase {
 
         // Start zk & bks
         int zkPort2 = PortManager.nextFreePort();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
         bkEnsemble2.start();
 
         int webServicePort2 = PortManager.nextFreePort();
@@ -177,7 +177,7 @@ public class ReplicatorTestBase {
 
         // Start zk & bks
         int zkPort3 = PortManager.nextFreePort();
-        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
+        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
         bkEnsemble3.start();
 
         int webServicePort3 = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
index fc5001b..cdc7c06 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
@@ -104,7 +104,7 @@ public class V1_ReplicatorTestBase {
 
         // Start region 1
         int zkPort1 = PortManager.nextFreePort();
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
         bkEnsemble1.start();
 
         int webServicePort1 = PortManager.nextFreePort();
@@ -142,7 +142,7 @@ public class V1_ReplicatorTestBase {
 
         // Start zk & bks
         int zkPort2 = PortManager.nextFreePort();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
         bkEnsemble2.start();
 
         int webServicePort2 = PortManager.nextFreePort();
@@ -176,7 +176,7 @@ public class V1_ReplicatorTestBase {
 
         // Start zk & bks
         int zkPort3 = PortManager.nextFreePort();
-        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
+        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
         bkEnsemble3.start();
 
         int webServicePort3 = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 29901b5..2e7a444 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -871,7 +871,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
             // Start region 1
             int zkPort1 = PortManager.nextFreePort();
-            bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, PortManager.nextFreePort());
+            bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
             bkEnsemble1.start();
 
             int webServicePort1 = PortManager.nextFreePort();
@@ -901,7 +901,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
             // Start zk & bks
             int zkPort2 = PortManager.nextFreePort();
-            bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, PortManager.nextFreePort());
+            bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
             bkEnsemble2.start();
 
             int webServicePort2 = PortManager.nextFreePort();
@@ -927,7 +927,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
             // Start zk & bks
             int zkPort3 = PortManager.nextFreePort();
-            bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, PortManager.nextFreePort());
+            bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
             bkEnsemble3.start();
 
             int webServicePort3 = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 8f18259..0f9219d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -98,7 +98,7 @@ public class PulsarWorkerAssignmentTest {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         String brokerServiceUrl = "http://127.0.0.1:" + brokerServicePort;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index a758c87..996d931 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -101,7 +101,7 @@ public class PulsarFunctionAdminTest {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
@@ -126,7 +126,7 @@ public class PulsarFunctionAdminTest {
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsAllowInsecureConnection(true);
-    
+
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
@@ -160,11 +160,11 @@ public class PulsarFunctionAdminTest {
                     workerConfig.getClientAuthenticationParameters());
         }
         pulsarClient = clientBuilder.build();
-       
+
         TenantInfo propAdmin = new TenantInfo();
         propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
-       
+
         Thread.sleep(100);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 97de0b8..2b32fd0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -92,7 +92,7 @@ public class PulsarFunctionTlsTest {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         config = spy(new ServiceConfiguration());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index dd39222..907cf86 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -133,7 +133,7 @@ public class PulsarSinkE2ETest {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
         bkEnsemble.start();
 
         String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 900dcf3..e30e4b1 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -66,6 +66,14 @@
       <scope>test</scope>
     </dependency>
 
+     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <version>${project.parent.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-common</artifactId>
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index f7923e4..1ead796 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -39,6 +39,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
@@ -81,8 +82,25 @@ public class LocalBookkeeperEnsemble {
     int numberOfBookies;
     private boolean clearOldData = false;
 
-    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort) {
-        this(numberOfBookies, zkPort, bkBasePort, null, null, true);
+    private static class BasePortManager implements Supplier<Integer> {
+
+        private int port;
+
+        public BasePortManager(int basePort) {
+            this.port = basePort;
+        }
+
+        @Override
+        public synchronized Integer get() {
+            return port++;
+        }
+    }
+
+    private final Supplier<Integer> portManager;
+
+
+    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer>
portManager) {
+        this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
     }
 
     public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String
zkDataDirName,
@@ -103,10 +121,22 @@ public class LocalBookkeeperEnsemble {
                                    String bkDataDirName,
                                    boolean clearOldData,
                                    String advertisedAddress) {
+        this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress,
+                new BasePortManager(bkBasePort));
+    }
+
+    public LocalBookkeeperEnsemble(int numberOfBookies,
+            int zkPort,
+            int streamStoragePort,
+            String zkDataDirName,
+            String bkDataDirName,
+            boolean clearOldData,
+            String advertisedAddress,
+            Supplier<Integer> portManager) {
         this.numberOfBookies = numberOfBookies;
         this.HOSTPORT = "127.0.0.1:" + zkPort;
         this.ZooKeeperDefaultPort = zkPort;
-        this.initialPort = bkBasePort;
+        this.portManager = portManager;
         this.streamStoragePort = streamStoragePort;
         this.zkDataDirName = zkDataDirName;
         this.bkDataDirName = bkDataDirName;
@@ -128,7 +158,6 @@ public class LocalBookkeeperEnsemble {
     String bkDataDirName;
     BookieServer bs[];
     ServerConfiguration bsConfs[];
-    Integer initialPort = 5000;
 
     // Stream/Table Storage
     StreamStorageLifecycleComponent streamStorage;
@@ -221,7 +250,7 @@ public class LocalBookkeeperEnsemble {
                 cleanDirectory(bkDataDir);
             }
 
-            int bookiePort = initialPort + i;
+            int bookiePort = portManager.get();
 
             // Ensure registration Z-nodes are cleared when standalone service is restarted
ungracefully
             String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(),
bookiePort);
@@ -257,7 +286,7 @@ public class LocalBookkeeperEnsemble {
                 bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE);
             }
             bs[i].start();
-            LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, initialPort
+ i,
+            LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort,
                     bkDataDir.getAbsolutePath());
         }
     }
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
index 95735af..a6dedaf 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
@@ -18,18 +18,18 @@
  */
 package org.apache.pulsar.zookeeper;
 
-import java.io.File;
-
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.commons.io.FileUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 
 @Test
 public class LocalBookkeeperEnsembleTest {
@@ -62,16 +62,14 @@ public class LocalBookkeeperEnsembleTest {
 
         final int numBk = 1;
         final int zkPort = PortManager.nextFreePort();
-        final int bkPort = PortManager.nextFreePort();
 
         // Start local Bookies/ZooKeepers and confirm that they are running at specified
ports
-        LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, bkPort);
+        LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, zkPort, ()
-> PortManager.nextFreePort());
         ensemble.start();
         assertTrue(ensemble.getZkServer().isRunning());
         assertEquals(ensemble.getZkServer().getClientPort(), zkPort);
         assertTrue(ensemble.getZkClient().getState().isConnected());
         assertTrue(ensemble.getBookies()[0].isRunning());
-        assertEquals(ensemble.getBookies()[0].getLocalAddress().getPort(), bkPort);
 
         // Stop local Bookies/ZooKeepers and confirm that they are correctly closed
         ensemble.stop();


Mime
View raw message