pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Simplify ServiceUrlProvider related APIs (#2671)
Date Fri, 28 Sep 2018 19:19:13 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 464e7d1  Simplify ServiceUrlProvider related APIs (#2671)
464e7d1 is described below

commit 464e7d136062aacd5db0e062b09dd0528d14a045
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Fri Sep 28 12:19:05 2018 -0700

    Simplify ServiceUrlProvider related APIs (#2671)
---
 .../pulsar/client/api/ServiceUrlProviderTest.java  |  8 ++---
 .../org/apache/pulsar/client/api/PulsarClient.java | 35 ++++++++--------------
 .../pulsar/client/api/ServiceUrlProvider.java      | 25 ++++++++++------
 .../client/impl/BinaryProtoLookupService.java      |  8 +++--
 .../pulsar/client/impl/ClientBuilderImpl.java      |  2 +-
 .../apache/pulsar/client/impl/ConnectionPool.java  | 20 ++++++++++++-
 .../org/apache/pulsar/client/impl/HttpClient.java  | 18 ++++++-----
 .../pulsar/client/impl/HttpLookupService.java      |  5 ++++
 .../apache/pulsar/client/impl/LookupService.java   |  6 ++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 22 ++++----------
 10 files changed, 86 insertions(+), 63 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index ab86f12..4b73b9e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -64,7 +64,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
         for (int i = 0; i < 100; i++) {
             producer.send("Hello Pulsar[" + i + "]");
         }
-        client.forceCloseConnection();
+        client.updateServiceUrl(pulsar.getBrokerServiceUrl());
         for (int i = 100; i < 200; i++) {
             producer.send("Hello Pulsar[" + i + "]");
         }
@@ -132,7 +132,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
         }
 
         @Override
-        public void setClient(PulsarClient client) {
+        public void initialize(PulsarClient client) {
             this.pulsarClient = client;
         }
 
@@ -148,9 +148,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
         }
 
         public void onServiceUrlChanged(String newServiceUrl) throws PulsarClientException
{
-            this.getPulsarClient().getConf().setServiceUrl(newServiceUrl);
-            this.getPulsarClient().reloadLookUp();
-            this.getPulsarClient().forceCloseConnection();
+            this.getPulsarClient().updateServiceUrl(newServiceUrl);
         }
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index cba451b..ea80bc1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -339,6 +339,19 @@ public interface PulsarClient extends Closeable {
     CompletableFuture<Reader<byte[]>> createReaderAsync(String topic, MessageId
startMessageId, ReaderConfiguration conf);
 
     /**
+     * Update the service URL this client is using.
+     *
+     * This will force the client close all existing connections and to restart service discovery
to the new service
+     * endpoint.
+     *
+     * @param serviceUrl
+     *            the new service URL this client should connect to
+     * @throws PulsarClientException
+     *             in case the serviceUrl is not valid
+     */
+    void updateServiceUrl(String serviceUrl) throws PulsarClientException;
+
+    /**
      * Close the PulsarClient and release all the resources.
      *
      * All the producers and consumers will be orderly closed. Waits until all pending write
request are persisted.
@@ -368,26 +381,4 @@ public interface PulsarClient extends Closeable {
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
-
-    /**
-     * Force close connection of pulsar client.
-     *
-     * close all producer connection and close all consumer producer.
-     *
-     */
-    void forceCloseConnection();
-
-    /**
-     * Reload lookup service in pulsar client.
-     *
-     * @throws PulsarClientException
-     */
-    void reloadLookUp() throws PulsarClientException;
-
-    /**
-     * Get client config data.
-     *
-     * @return
-     */
-    ClientConfigurationData getConf();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index 9b6e963..a668987 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -19,24 +19,31 @@
 package org.apache.pulsar.client.api;
 
 /**
- * The provider to provide the service url
- * It used by {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
+ * The provider to provide the service url.
+ *
+ * This allows applications to retrieve the service URL from an external configuration provider
and, more importantly,
+ * to force the Pulsar client to reconnect if the service URL has been changed.
+ *
+ * It can be passed with {@link ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
  */
 public interface ServiceUrlProvider {
 
     /**
-     * Get pulsar service url from ServiceUrlProvider.
+     * Initialize the service url provider with Pulsar client instance.
+     *
+     * This can be used by the provider to force the Pulsar client to reconnect whenever
the service url might have
+     * changed. See {@link PulsarClient#updateServiceUrl(String)}.
      *
-     * @return pulsar service url.
+     * @param client
+     *            created pulsar client.
      */
-    String getServiceUrl();
+    void initialize(PulsarClient client);
 
     /**
-     * Set pulsar client to the provider for provider can control the pulsar client,
-     * such as {@link PulsarClient#forceCloseConnection()} or {@link PulsarClient#close()}.
+     * Get the current service URL the Pulsar client should connect to.
      *
-     * @param client created pulsar client.
+     * @return the pulsar service url.
      */
-    void setClient(PulsarClient client);
+    String getServiceUrl();
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e4cb7bd..e373ed8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
 public class BinaryProtoLookupService implements LookupService {
 
     private final PulsarClientImpl client;
-    protected final InetSocketAddress serviceAddress;
+    protected volatile InetSocketAddress serviceAddress;
     private final boolean useTls;
     private final ExecutorService executor;
 
@@ -61,6 +60,11 @@ public class BinaryProtoLookupService implements LookupService {
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
+        updateServiceUrl(serviceUrl);
+    }
+
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
         URI uri;
         try {
             uri = new URI(serviceUrl);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 6a59520..3e87bc9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -53,7 +53,7 @@ public class ClientBuilderImpl implements ClientBuilder {
         }
         PulsarClient client = new PulsarClientImpl(conf);
         if (conf.getServiceUrlProvider() != null) {
-            conf.getServiceUrlProvider().setClient(client);
+            conf.getServiceUrlProvider().initialize(client);
         }
         return client;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 3188b35..0c7751b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -72,7 +72,7 @@ public class ConnectionPool implements Closeable {
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
     }
-    
+
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx>
clientCnxSupplier) {
         this.eventLoopGroup = eventLoopGroup;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -119,6 +119,24 @@ public class ConnectionPool implements Closeable {
         return getConnection(address, address);
     }
 
+    void closeAllConnections() {
+        pool.values().forEach(map -> {
+            map.values().forEach(future -> {
+                if (future.isDone()) {
+                    if (!future.isCompletedExceptionally()) {
+                        // Connection was already created successfully, the join will not
throw any exception
+                        future.join().close();
+                    } else {
+                        // If the future already failed, there's nothing we have to do
+                    }
+                } else {
+                    // The future is still pending: just register to make sure it gets closed
if the operation will succeed
+                    future.thenAccept(ClientCnx::close);
+                }
+            });
+        });
+    }
+
     /**
      * Get a connection from the pool.
      * <p>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index e5ef4c5..0e94e6b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -60,7 +60,7 @@ public class HttpClient implements Closeable {
     protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     protected final AsyncHttpClient httpClient;
-    protected final URL url;
+    protected volatile URL url;
     protected final Authentication authentication;
 
     protected HttpClient(String serviceUrl, Authentication authentication,
@@ -74,12 +74,7 @@ public class HttpClient implements Closeable {
             EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
             int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException
{
         this.authentication = authentication;
-        try {
-            // Ensure trailing "/" on url
-            url = new URL(serviceUrl);
-        } catch (MalformedURLException e) {
-            throw new PulsarClientException.InvalidServiceURL(e);
-        }
+        setServiceUrl(serviceUrl);
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
@@ -120,6 +115,15 @@ public class HttpClient implements Closeable {
         log.debug("Using HTTP url: {}", this.url);
     }
 
+    void setServiceUrl(String serviceUrl) throws PulsarClientException {
+        try {
+            // Ensure trailing "/" on url
+            url = new URL(serviceUrl);
+        } catch (MalformedURLException e) {
+            throw new PulsarClientException.InvalidServiceURL(e);
+        }
+    }
+
     @Override
     public void close() throws IOException {
         httpClient.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 3abdf25..97ef0e1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -59,6 +59,11 @@ class HttpLookupService implements LookupService {
         this.useTls = conf.isUseTls();
     }
 
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
+        httpClient.setServiceUrl(serviceUrl);
+    }
+
     /**
      * Calls http-lookup api to find broker-service address which can serve a given topic.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index a2af17c..5d526a5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -44,6 +45,11 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 public interface LookupService extends AutoCloseable {
 
     /**
+     * Instruct the LookupService to switch to a new service URL for all subsequent requests
+     */
+    void updateServiceUrl(String serviceUrl) throws PulsarClientException;
+
+    /**
      * Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace
bundle that contains given
      * topic.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5752486..9640c09 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -34,7 +34,6 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -712,16 +711,12 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     @Override
-    public void forceCloseConnection() {
-        for (ConcurrentMap<Integer, CompletableFuture<ClientCnx>> cnxMap : cnxPool.pool.values())
{
-            for (CompletableFuture<ClientCnx> clientCnxCompletableFuture : cnxMap.values())
{
-                try {
-                    clientCnxCompletableFuture.get().close();
-                } catch (Exception e) {
-                    log.error("Force close connection exception ", e);
-                }
-            }
-        }
+    public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClientException
{
+        log.info("Updating service URL to {}", serviceUrl);
+
+        conf.setServiceUrl(serviceUrl);
+        lookup.updateServiceUrl(serviceUrl);
+        cnxPool.closeAllConnections();
     }
 
     protected CompletableFuture<ClientCnx> getConnection(final String topic) {
@@ -771,11 +766,6 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
-    @Override
-    public ClientConfigurationData getConf() {
-        return conf;
-    }
-
     public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
         return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions);
     }


Mime
View raw message