kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2138; Fix producer to honor retry backoff; reviewed by Joel Koshy and Guozhang Wang
Date Fri, 24 Apr 2015 19:42:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 01e94e2b4 -> 2166104af


KAFKA-2138; Fix producer to honor retry backoff; reviewed by Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2166104a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2166104a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2166104a

Branch: refs/heads/trunk
Commit: 2166104aff174e2ceb45638ca7be34f1db37e3e1
Parents: 01e94e2
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Apr 24 12:41:32 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Apr 24 12:41:32 2015 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 26 ++++++++------
 .../internals/RecordAccumulatorTest.java        | 38 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2166104a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 0e7ab29..49a9883 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -292,17 +292,21 @@ public final class RecordAccumulator {
                     synchronized (deque) {
                         RecordBatch first = deque.peekFirst();
                         if (first != null) {
-                            if (size + first.records.sizeInBytes() > maxSize &&
!ready.isEmpty()) {
-                                // there is a rare case that a single batch size is larger
than the request size due
-                                // to compression; in this case we will still eventually
send this batch in a single
-                                // request
-                                break;
-                            } else {
-                                RecordBatch batch = deque.pollFirst();
-                                batch.records.close();
-                                size += batch.records.sizeInBytes();
-                                ready.add(batch);
-                                batch.drainedMs = now;
+                            boolean backoff = first.attempts > 0 && first.lastAttemptMs
+ retryBackoffMs > now;
+                            // Only drain the batch if it is not during backoff period.
+                            if (!backoff) {
+                                if (size + first.records.sizeInBytes() > maxSize &&
!ready.isEmpty()) {
+                                    // there is a rare case that a single batch size is larger
than the request size due
+                                    // to compression; in this case we will still eventually
send this batch in a single
+                                    // request
+                                    break;
+                                } else {
+                                    RecordBatch batch = deque.pollFirst();
+                                    batch.records.close();
+                                    size += batch.records.sizeInBytes();
+                                    ready.add(batch);
+                                    batch.drainedMs = now;
+                                }
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2166104a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 05e2929..baa48e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -203,6 +203,44 @@ public class RecordAccumulatorTest {
         // but have leaders with other sendable data.
         assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs
<= lingerMs);
     }
+
+    @Test
+    public void testRetryBackoff() throws Exception {
+        long lingerMs = Long.MAX_VALUE / 4;
+        long retryBackoffMs = Long.MAX_VALUE / 2;
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, retryBackoffMs, false, metrics, time, metricTags);
+
+        long now = time.milliseconds();
+        accum.append(tp1, key, value, null);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs +
1);
+        assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
+        Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes,
Integer.MAX_VALUE, now + lingerMs + 1);
+        assertEquals("Node1 should be the only ready node.", 1, batches.size());
+        assertEquals("Partition 0 should only have one batch drained.", 1, batches.get(0).size());
+
+        // Reenqueue the batch
+        now = time.milliseconds();
+        accum.reenqueue(batches.get(0).get(0), now);
+
+        // Put message for partition 1 into accumulator
+        accum.append(tp2, key, value, null);
+        result = accum.ready(cluster, now + lingerMs + 1);
+        assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
+
+        // tp1 should backoff while tp2 should not
+        batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs
+ 1);
+        assertEquals("Node1 should be the only ready node.", 1, batches.size());
+        assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size());
+        assertEquals("Node1 should only have one batch for partition 1.", tp2, batches.get(0).get(0).topicPartition);
+
+        // Partition 0 can be drained after retry backoff
+        result = accum.ready(cluster, now + retryBackoffMs + 1);
+        assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
+        batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs
+ 1);
+        assertEquals("Node1 should be the only ready node.", 1, batches.size());
+        assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size());
+        assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition);
+    }
     
     @Test
     public void testFlush() throws Exception {


Mime
View raw message