kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4290: Fix timeout overflow in WorkerCoordinator.poll
Date Tue, 11 Oct 2016 06:23:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 d981dd25b -> a1afb7394


KAFKA-4290: Fix timeout overflow in WorkerCoordinator.poll

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2009 from hachikuji/KAFKA-4290

(cherry picked from commit c1e840050c11d2f1d20118273c572e4c8491866d)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1afb739
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1afb739
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1afb739

Branch: refs/heads/0.10.1
Commit: a1afb73941a578cc337ed44321780d3fa69c8df8
Parents: d981dd2
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Oct 10 23:02:19 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Oct 10 23:23:06 2016 -0700

----------------------------------------------------------------------
 .../runtime/distributed/WorkerCoordinator.java    | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1afb739/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 9114555..8a065f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -102,10 +102,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements
Clos
 
     public void poll(long timeout) {
         // poll for io until the timeout expires
-        long now = time.milliseconds();
-        long deadline = now + timeout;
+        final long start = time.milliseconds();
+        long now = start;
+        long remaining;
 
-        while (now <= deadline) {
+        do {
             if (coordinatorUnknown()) {
                 ensureCoordinatorReady();
                 now = time.milliseconds();
@@ -118,12 +119,17 @@ public final class WorkerCoordinator extends AbstractCoordinator implements
Clos
 
             pollHeartbeat(now);
 
+            long elapsed = now - start;
+            remaining = timeout - elapsed;
+
             // Note that because the network client is shared with the background heartbeat
thread,
             // we do not want to block in poll longer than the time to the next heartbeat.
-            long remaining = Math.max(0, deadline - now);
-            client.poll(Math.min(remaining, timeToNextHeartbeat(now)));
+            client.poll(Math.min(Math.max(0, remaining), timeToNextHeartbeat(now)));
+
             now = time.milliseconds();
-        }
+            elapsed = now - start;
+            remaining = timeout - elapsed;
+        } while (remaining > 0);
     }
 
     @Override


Mime
View raw message