kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone
Date Thu, 12 Nov 2015 02:42:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 1828b8e52 -> 39ea9d3b7


KAFKA2805; RecordAccumulator request timeout not enforced when all brokers are gone

Removed the check for expiring only those batches whose metadata is unavailable. Now the batches
will be expired irrespective of whether the leader is available or not, as soon as it reaches
the requestimeout threshold.

Author: Mayuresh Gharat <mgharat@mgharat-ld1.linkedin.biz>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #503 from MayureshGharat/kafka-2805

(cherry picked from commit 1cd22ed33f1e1f63d8cb63f68309e5d8f43be1e1)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39ea9d3b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39ea9d3b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39ea9d3b

Branch: refs/heads/0.9.0
Commit: 39ea9d3b70341d58dfc9c9fa837229046d483868
Parents: 1828b8e
Author: Mayuresh Gharat <mgharat@mgharat-ld1.linkedin.biz>
Authored: Wed Nov 11 18:41:45 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Nov 11 18:41:57 2015 -0800

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39ea9d3b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index db61121..d4a8a23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -216,18 +216,15 @@ public final class RecordAccumulator {
                 Iterator<RecordBatch> batchIterator = dq.iterator();
                 while (batchIterator.hasNext()) {
                     RecordBatch batch = batchIterator.next();
-                    Node leader = cluster.leaderFor(topicAndPartition);
-                    if (leader == null) {
-                        // check if the batch is expired
-                        if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
-                            expiredBatches.add(batch);
-                            count++;
-                            batchIterator.remove();
-                            deallocate(batch);
-                        } else {
-                            if (!batch.inRetry()) {
-                                break;
-                            }
+                    // check if the batch is expired
+                    if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
+                        expiredBatches.add(batch);
+                        count++;
+                        batchIterator.remove();
+                        deallocate(batch);
+                    } else {
+                        if (!batch.inRetry()) {
+                            break;
                         }
                     }
                 }


Mime
View raw message