spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-14547] Avoid DNS resolution for reusing connections
Date Tue, 12 Apr 2016 22:28:11 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1ef5f8cfa -> c439d88e9


[SPARK-14547] Avoid DNS resolution for reusing connections

## What changes were proposed in this pull request?
This patch changes the connection creation logic in the network client module to avoid DNS
resolution when reusing connections.

## How was this patch tested?
Testing in production. This is too difficult to test in isolation (for high fidelity unit
tests, we'd need to change the DNS resolution behavior in the JVM).

Author: Reynold Xin <rxin@databricks.com>

Closes #12315 from rxin/SPARK-14547.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c439d88e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c439d88e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c439d88e

Branch: refs/heads/master
Commit: c439d88e99c35a5f29f071715addfee8cbb215dc
Parents: 1ef5f8c
Author: Reynold Xin <rxin@databricks.com>
Authored: Tue Apr 12 15:28:08 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue Apr 12 15:28:08 2016 -0700

----------------------------------------------------------------------
 .../network/client/TransportClientFactory.java  | 31 +++++++++++++-------
 1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c439d88e/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index b5a9d66..a27aaf2 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -123,16 +123,15 @@ public class TransportClientFactory implements Closeable {
   public TransportClient createClient(String remoteHost, int remotePort) throws IOException
{
     // Get connection from the connection pool first.
     // If it is not found or not active, create a new one.
-    long preResolveHost = System.nanoTime();
-    final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
-    long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
-    logger.info("Spent {} ms to resolve {}", hostResolveTimeMs, address);
+    // Use unresolved address here to avoid DNS resolution each time we creates a client.
+    final InetSocketAddress unresolvedAddress =
+      InetSocketAddress.createUnresolved(remoteHost, remotePort);
 
     // Create the ClientPool if we don't have it yet.
-    ClientPool clientPool = connectionPool.get(address);
+    ClientPool clientPool = connectionPool.get(unresolvedAddress);
     if (clientPool == null) {
-      connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
-      clientPool = connectionPool.get(address);
+      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
+      clientPool = connectionPool.get(unresolvedAddress);
     }
 
     int clientIndex = rand.nextInt(numConnectionsPerPeer);
@@ -149,25 +148,35 @@ public class TransportClientFactory implements Closeable {
       }
 
       if (cachedClient.isActive()) {
-        logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+        logger.trace("Returning cached connection to {}: {}",
+          cachedClient.getSocketAddress(), cachedClient);
         return cachedClient;
       }
     }
 
     // If we reach here, we don't have an existing connection open. Let's create a new one.
     // Multiple threads might race here to create new connections. Keep only one of them
active.
+    final long preResolveHost = System.nanoTime();
+    final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
+    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
+    if (hostResolveTimeMs > 2000) {
+      logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+    } else {
+      logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+    }
+
     synchronized (clientPool.locks[clientIndex]) {
       cachedClient = clientPool.clients[clientIndex];
 
       if (cachedClient != null) {
         if (cachedClient.isActive()) {
-          logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+          logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
           return cachedClient;
         } else {
-          logger.info("Found inactive connection to {}, creating a new one.", address);
+          logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
         }
       }
-      clientPool.clients[clientIndex] = createClient(address);
+      clientPool.clients[clientIndex] = createClient(resolvedAddress);
       return clientPool.clients[clientIndex];
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message