pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat commented on a change in pull request #930: Issue #929: Perform async DNS resolution each time attempting connect?
Date Thu, 01 Jan 1970 00:00:00 GMT
merlimat commented on a change in pull request #930: Issue #929: Perform async DNS resolution
each time attempting connect?
URL: https://github.com/apache/incubator-pulsar/pull/930#discussion_r155148767
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
 ##########
 @@ -195,14 +203,91 @@ public void initChannel(SocketChannel ch) throws Exception {
                 cnx.ctx().close();
                 return null;
             });
+        }).exceptionally(exception -> {
+            log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getClass().getSimpleName());
+            cnxFuture.completeExceptionally(new PulsarClientException(exception));
+            cleanupConnection(logicalAddress, connectionKey, cnxFuture);
+            return null;
         });
 
         return cnxFuture;
     }
 
+    /**
+     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS
server
+     */
+    private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress)
{
+        String hostname = unresolvedAddress.getHostString();
+        int port = unresolvedAddress.getPort();
+
+        // Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
+        return resolveName(hostname)
+                .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(),
port));
+    }
+
+    /**
+     * Try to connect to a sequence of IP addresses until a successfull connection can be
made, or fail if no address is
+     * working
+     */
+    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress>
unresolvedAddresses, int port) {
+        CompletableFuture<Channel> future = new CompletableFuture<>();
+
+        connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> {
+            // Successfully connected to server
+            future.complete(channel);
+        }).exceptionally(exception -> {
+            if (unresolvedAddresses.hasNext()) {
+                // Try next IP address
+                connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel
-> {
+                    future.complete(channel);
+                }).exceptionally(ex -> {
+                    // This is already unwinding the recursive call
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+            } else {
+                // Failed to connect to any IP address
+                future.completeExceptionally(exception);
+            }
+            return null;
+        });
+
+        return future;
+    }
+
+    private CompletableFuture<List<InetAddress>> resolveName(String hostname)
{
+        CompletableFuture<List<InetAddress>> future = new CompletableFuture<>();
+        dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>>
resolveFuture) -> {
+            if (resolveFuture.isSuccess()) {
+                future.complete(resolveFuture.get());
 
 Review comment:
   I think that if the dns operation completes successfully, it should always carry at least
1 IP address

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message