kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4024; Override client metadata backoff on topic changes and avoid unnecessary connections
Date Fri, 04 Nov 2016 06:19:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1a2ed8633 -> e795ad96c


KAFKA-4024; Override client metadata backoff on topic changes and avoid unnecessary connections

Fixes a bug that inappropriately applies backoff as interval between metadata updates even
though the current one is outdated.

Author: Yuto Kawamura <kawamuray.dadada@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #1707 from kawamuray/KAFKA-4024-metadata-backoff


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e795ad96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e795ad96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e795ad96

Branch: refs/heads/trunk
Commit: e795ad96ca9f26cdef01d063b79483a6c9221f27
Parents: 1a2ed86
Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
Authored: Thu Nov 3 23:04:09 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Nov 3 23:19:01 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  |  9 ++
 .../java/org/apache/kafka/clients/Metadata.java | 25 +++--
 .../org/apache/kafka/clients/NetworkClient.java | 82 +++++++++-------
 .../org/apache/kafka/clients/MetadataTest.java  | 98 ++++++++++++++++++++
 4 files changed, 172 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e795ad96/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index ad35e20..6b90ab8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -77,6 +77,15 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Return true if a specific connection establishment is currently underway
+     * @param id The id of the node to check
+     */
+    public boolean isConnecting(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null && state.state == ConnectionState.CONNECTING;
+    }
+
+    /**
      * Enter the connecting state for the given connection.
      * @param id The id of the connection
      * @param now The current time.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e795ad96/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f717001..75d48ab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -110,7 +110,9 @@ public final class Metadata {
      * will be reset on the next update.
      */
     public synchronized void add(String topic) {
-        topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
+        if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
+            requestUpdateForNewTopics();
+        }
     }
 
     /**
@@ -166,8 +168,9 @@ public final class Metadata {
      * @param topics
      */
     public synchronized void setTopics(Collection<String> topics) {
-        if (!this.topics.keySet().containsAll(topics))
-            requestUpdate();
+        if (!this.topics.keySet().containsAll(topics)) {
+            requestUpdateForNewTopics();
+        }
         this.topics.clear();
         for (String topic : topics)
             this.topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
@@ -264,17 +267,13 @@ public final class Metadata {
     }
 
     /**
-     * The metadata refresh backoff in ms
-     */
-    public long refreshBackoff() {
-        return refreshBackoffMs;
-    }
-
-    /**
      * Set state to indicate if metadata for all topics in Kafka cluster is required or not.
      * @param needMetadataForAllTopics boolean indicating need for metadata of all topics
in cluster.
      */
     public synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics) {
+        if (needMetadataForAllTopics && !this.needMetadataForAllTopics) {
+            requestUpdateForNewTopics();
+        }
         this.needMetadataForAllTopics = needMetadataForAllTopics;
     }
 
@@ -306,6 +305,12 @@ public final class Metadata {
         void onMetadataUpdate(Cluster cluster);
     }
 
+    private synchronized void requestUpdateForNewTopics() {
+        // Override the timestamp of last refresh to let immediate update.
+        this.lastRefreshMs = 0;
+        requestUpdate();
+    }
+
     private Cluster getClusterForCurrentTopics(Cluster cluster) {
         Set<String> unauthorizedTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e795ad96/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8ab634d..29c6d6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -77,6 +77,9 @@ public class NetworkClient implements KafkaClient {
     /* max time in ms for the producer to wait for acknowledgement from server*/
     private final int requestTimeoutMs;
 
+    /* time in ms to wait before retrying to create connection to a server */
+    private final long reconnectBackoffMs;
+
     private final Time time;
 
     public NetworkClient(Selectable selector,
@@ -136,6 +139,7 @@ public class NetworkClient implements KafkaClient {
         this.correlation = 0;
         this.randOffset = new Random();
         this.requestTimeoutMs = requestTimeoutMs;
+        this.reconnectBackoffMs = reconnectBackoffMs;
         this.time = time;
     }
 
@@ -516,13 +520,9 @@ public class NetworkClient implements KafkaClient {
         /* true iff there is a metadata request that has been sent and for which we have
not yet received a response */
         private boolean metadataFetchInProgress;
 
-        /* the last timestamp when no broker node is available to connect */
-        private long lastNoNodeAvailableMs;
-
         DefaultMetadataUpdater(Metadata metadata) {
             this.metadata = metadata;
             this.metadataFetchInProgress = false;
-            this.lastNoNodeAvailableMs = 0;
         }
 
         @Override
@@ -539,20 +539,22 @@ public class NetworkClient implements KafkaClient {
         public long maybeUpdate(long now) {
             // should we update our metadata?
             long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff()
- now, 0);
-            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE
: 0;
-            // if there is no node available to connect, back off refreshing metadata
-            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
-                    waitForMetadataFetch);
-
-            if (metadataTimeout == 0) {
-                // Beware that the behavior of this method and the computation of timeouts
for poll() are
-                // highly dependent on the behavior of leastLoadedNode.
-                Node node = leastLoadedNode(now);
-                maybeUpdate(now, node);
+            long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs :
0;
+
+            long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
+            if (metadataTimeout > 0) {
+                return metadataTimeout;
             }
 
-            return metadataTimeout;
+            // Beware that the behavior of this method and the computation of timeouts for
poll() are
+            // highly dependent on the behavior of leastLoadedNode.
+            Node node = leastLoadedNode(now);
+            if (node == null) {
+                log.debug("Give up sending metadata request since no node is available");
+                return reconnectBackoffMs;
+            }
+
+            return maybeUpdate(now, node);
         }
 
         @Override
@@ -618,15 +620,21 @@ public class NetworkClient implements KafkaClient {
         }
 
         /**
-         * Add a metadata request to the list of sends if we can make one
+         * Return true if there's at least one connection establishment is currently underway
          */
-        private void maybeUpdate(long now, Node node) {
-            if (node == null) {
-                log.debug("Give up sending metadata request since no node is available");
-                // mark the timestamp for no node available to connect
-                this.lastNoNodeAvailableMs = now;
-                return;
+        private boolean isAnyNodeConnecting() {
+            for (Node node : fetchNodes()) {
+                if (connectionStates.isConnecting(node.idString())) {
+                    return true;
+                }
             }
+            return false;
+        }
+
+        /**
+         * Add a metadata request to the list of sends if we can make one
+         */
+        private long maybeUpdate(long now, Node node) {
             String nodeConnectionId = node.idString();
 
             if (canSendRequest(nodeConnectionId)) {
@@ -639,19 +647,29 @@ public class NetworkClient implements KafkaClient {
                 ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                 doSend(clientRequest, now);
-            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
+                return requestTimeoutMs;
+            }
+
+            // If there's any connection establishment underway, wait until it completes.
This prevents
+            // the client from unnecessarily connecting to additional nodes while a previous
connection
+            // attempt has not been completed.
+            if (isAnyNodeConnecting()) {
+                // Strictly the timeout we should return here is "connect timeout", but as
we don't
+                // have such application level configuration, using reconnect backoff instead.
+                return reconnectBackoffMs;
+            }
+
+            if (connectionStates.canConnect(nodeConnectionId, now)) {
                 // we don't have a connection to this node right now, make one
                 log.debug("Initialize connection to node {} for sending metadata request",
node.id());
                 initiateConnect(node, now);
-                // If initiateConnect failed immediately, this node will be put into blackout
and we
-                // should allow immediately retrying in case there is another candidate node.
If it
-                // is still connecting, the worst case is that we end up setting a longer
timeout
-                // on the next round and then wait for the response.
-            } else { // connected, but can't send more OR connecting
-                // In either case, we just need to wait for a network event to let us know
the selected
-                // connection might be usable again.
-                this.lastNoNodeAvailableMs = now;
+                return reconnectBackoffMs;
             }
+
+            // connected, but can't send more OR connecting
+            // In either case, we just need to wait for a network event to let us know the
selected
+            // connection might be usable again.
+            return Long.MAX_VALUE;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e795ad96/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1f4f770..cfd2a94 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -79,6 +79,104 @@ public class MetadataTest {
         assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time)
== 0);
     }
 
+    private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs)
{
+        long now = 10000;
+
+        // Metadata timeToNextUpdate is implicitly relying on the premise that the currentTimeMillis
is always
+        // larger than the metadataExpireMs or refreshBackoffMs.
+        // It won't be a problem practically since all usages of Metadata calls first update()
immediately after
+        // it's construction.
+        if (metadataExpireMs > now || refreshBackoffMs > now) {
+            throw new IllegalArgumentException(
+                    "metadataExpireMs and refreshBackoffMs must be smaller than 'now'");
+        }
+
+        long largerOfBackoffAndExpire = Math.max(refreshBackoffMs, metadataExpireMs);
+        Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+
+        assertEquals(0, metadata.timeToNextUpdate(now));
+
+        // lastSuccessfulRefreshMs updated to now.
+        metadata.update(Cluster.empty(), now);
+
+        // The last update was successful so the remaining time to expire the current metadata
should be returned.
+        assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
+
+        // Metadata update requested explicitly
+        metadata.requestUpdate();
+        // Update requested so metadataExpireMs should no longer take effect.
+        assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
+
+        // Reset needUpdate to false.
+        metadata.update(Cluster.empty(), now);
+        assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
+
+        // Both metadataExpireMs and refreshBackoffMs elapsed.
+        now += largerOfBackoffAndExpire;
+        assertEquals(0, metadata.timeToNextUpdate(now));
+        assertEquals(0, metadata.timeToNextUpdate(now + 1));
+    }
+
+    @Test
+    public void testTimeToNextUpdate() {
+        checkTimeToNextUpdate(100, 1000);
+        checkTimeToNextUpdate(1000, 100);
+        checkTimeToNextUpdate(0, 0);
+        checkTimeToNextUpdate(0, 100);
+        checkTimeToNextUpdate(100, 0);
+    }
+
+    @Test
+    public void testTimeToNextUpdate_RetryBackoff() {
+        long now = 10000;
+
+        // lastRefreshMs updated to now.
+        metadata.failedUpdate(now);
+
+        // Backing off. Remaining time until next try should be returned.
+        assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
+
+        // Even though metadata update requested explicitly, still respects backoff.
+        metadata.requestUpdate();
+        assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
+
+        // refreshBackoffMs elapsed.
+        now += refreshBackoffMs;
+        // It should return 0 to let next try.
+        assertEquals(0, metadata.timeToNextUpdate(now));
+        assertEquals(0, metadata.timeToNextUpdate(now + 1));
+    }
+
+    @Test
+    public void testTimeToNextUpdate_OverwriteBackoff() {
+        long now = 10000;
+
+        // New topic added to fetch set and update requested. It should allow immediate update.
+        metadata.update(Cluster.empty(), now);
+        metadata.add("new-topic");
+        assertEquals(0, metadata.timeToNextUpdate(now));
+
+        // Even though setTopics called, immediate update isn't necessary if the new topic
set isn't
+        // containing a new topic,
+        metadata.update(Cluster.empty(), now);
+        metadata.setTopics(metadata.topics());
+        assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
+
+        // If the new set of topics containing a new topic then it should allow immediate
update.
+        metadata.setTopics(Collections.singletonList("another-new-topic"));
+        assertEquals(0, metadata.timeToNextUpdate(now));
+
+        // If metadata requested for all topics it should allow immediate update.
+        metadata.update(Cluster.empty(), now);
+        metadata.needMetadataForAllTopics(true);
+        assertEquals(0, metadata.timeToNextUpdate(now));
+
+        // However if metadata is already capable to serve all topics it shouldn't override
backoff.
+        metadata.update(Cluster.empty(), now);
+        metadata.needMetadataForAllTopics(true);
+        assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
+    }
+
     /**
      * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
      * wait forever with a max timeout value of 0


Mime
View raw message