kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1642; (followup patch) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; patched by Ewen Cheslack-Postava; reviewed by Jun Rao
Date Tue, 06 Jan 2015 18:56:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 10c6dec34 -> 4471dc08b


kafka-1642; (followup patch) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when
network connection is lost; patched by Ewen Cheslack-Postava; patched by Ewen Cheslack-Postava;
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4471dc08
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4471dc08
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4471dc08

Branch: refs/heads/trunk
Commit: 4471dc08b64809f3cdcbff6b8c830e13df87b117
Parents: 10c6dec
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Jan 6 10:56:32 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jan 6 10:56:32 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java    | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4471dc08/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 525b95e..6746275 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -180,9 +180,10 @@ public class NetworkClient implements KafkaClient {
 
         // should we update our metadata?
         long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-        long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff()
- now;
+        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff()
- now, 0);
+        long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
         // if there is no node available to connect, back off refreshing metadata
-        long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt);
+        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
         if (!this.metadataFetchInProgress && metadataTimeout == 0)
             maybeUpdateMetadata(sends, now);
 
@@ -371,6 +372,8 @@ public class NetworkClient implements KafkaClient {
      * Add a metadata request to the list of sends if we can make one
      */
     private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
+        // Beware that the behavior of this method and the computation of timeouts for poll()
are
+        // highly dependent on the behavior of leastLoadedNode.
         Node node = this.leastLoadedNode(now);
         if (node == null) {
             log.debug("Give up sending metadata request since no node is available");
@@ -391,6 +394,14 @@ public class NetworkClient implements KafkaClient {
             // we don't have a connection to this node right now, make one
             log.debug("Init connection to node {} for sending metadata request in the next
iteration", node.id());
             initiateConnect(node, now);
+            // If initiateConnect failed immediately, this node will be put into blackout
and we
+            // should allow immediately retrying in case there is another candidate node.
If it
+            // is still connecting, the worst case is that we end up setting a longer timeout
+            // on the next round and then wait for the response.
+        } else { // connected, but can't send more OR connecting
+            // In either case, we just need to wait for a network event to let us know the
selected
+            // connection might be usable again.
+            this.lastNoNodeAvailableMs = now;
         }
     }
 
@@ -400,8 +411,8 @@ public class NetworkClient implements KafkaClient {
     private void initiateConnect(Node node, long now) {
         try {
             log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(),
node.port());
-            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer, this.socketReceiveBuffer);
             this.connectionStates.connecting(node.id(), now);
+            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer, this.socketReceiveBuffer);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(node.id());


Mime
View raw message