kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Introduce max wait time for retry-and-backoff while creating tasks
Date Wed, 14 Jun 2017 16:20:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 18c6ca92e -> e60d547ef


HOTFIX: Introduce max wait time for retry-and-backoff while creating tasks

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3327 from mjsax/hotfix-backoff-retry

(cherry picked from commit 65e36895f5cf91afcf785857ebff4c0fce8d1d9c)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e60d547e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e60d547e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e60d547e

Branch: refs/heads/0.11.0
Commit: e60d547eff4e918b50998a42123ab3a942ba76e9
Parents: 18c6ca9
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Jun 14 09:20:48 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jun 14 09:20:57 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamThread.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e60d547e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 39369c0..372f314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -238,6 +238,7 @@ public class StreamThread extends Thread {
     }
 
     abstract class AbstractTaskCreator {
+        final static long MAX_BACKOFF_TIME_MS = 1000L;
         void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated,
final long start) {
             long backoffTimeMs = 50L;
             final Set<TaskId> retryingTasks = new HashSet<>();
@@ -272,6 +273,7 @@ public class StreamThread extends Thread {
                 try {
                     Thread.sleep(backoffTimeMs);
                     backoffTimeMs <<= 1;
+                    backoffTimeMs = Math.min(backoffTimeMs, MAX_BACKOFF_TIME_MS);
                 } catch (final InterruptedException e) {
                     // ignore
                 }


Mime
View raw message