kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3388; Fix expiration of batches sitting in the accumulator
Date Sat, 26 Mar 2016 16:23:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 206757eeb -> b45fe7779


KAFKA-3388; Fix expiration of batches sitting in the accumulator

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #1056 from becketqin/KAFKA-3388

(cherry picked from commit 1fbe445dde71df0023a978c5e54dd229d3d23e1b)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b45fe777
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b45fe777
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b45fe777

Branch: refs/heads/0.10.0
Commit: b45fe77791560a2171e0bbd7df09f648a01b016f
Parents: 206757e
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Sat Mar 26 09:22:59 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sat Mar 26 09:23:12 2016 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 35 ++++++----
 .../clients/producer/internals/RecordBatch.java | 19 ++++--
 .../internals/RecordAccumulatorTest.java        | 67 +++++++++++++++++---
 3 files changed, 94 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b45fe777/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index beaa832..915c4d3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.util.Iterator;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -217,19 +218,27 @@ public final class RecordAccumulator {
         int count = 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet())
{
             Deque<RecordBatch> dq = entry.getValue();
-            synchronized (dq) {
-                // iterate over the batches and expire them if they have stayed in accumulator
for more than requestTimeOut
-                Iterator<RecordBatch> batchIterator = dq.iterator();
-                while (batchIterator.hasNext()) {
-                    RecordBatch batch = batchIterator.next();
-                    // check if the batch is expired
-                    if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
-                        expiredBatches.add(batch);
-                        count++;
-                        batchIterator.remove();
-                        deallocate(batch);
-                    } else {
-                        if (!batch.inRetry()) {
+            // We only check if the batch should be expired if the partition does not have
a batch in flight.
+            // This is to avoid the later batches get expired when an earlier batch is still
in progress.
+            // This protection only takes effect when user sets max.in.flight.request.per.connection=1.
+            // Otherwise the expiration order is not guranteed.
+            TopicPartition tp = entry.getKey();
+            if (!muted.contains(tp)) {
+                synchronized (dq) {
+                    // iterate over the batches and expire them if they have stayed in accumulator
for more than requestTimeOut
+                    RecordBatch lastBatch = dq.peekLast();
+                    Iterator<RecordBatch> batchIterator = dq.iterator();
+                    while (batchIterator.hasNext()) {
+                        RecordBatch batch = batchIterator.next();
+                        boolean isFull = batch != lastBatch || batch.records.isFull();
+                        // check if the batch is expired
+                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs,
isFull)) {
+                            expiredBatches.add(batch);
+                            count++;
+                            batchIterator.remove();
+                            deallocate(batch);
+                        } else {
+                            // Stop at the first batch that has not expired.
                             break;
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45fe777/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index eb7bbb3..e6cd68f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -134,14 +134,23 @@ public final class RecordBatch {
     }
 
     /**
-     * Expire the batch that is ready but is sitting in accumulator for more than requestTimeout
due to metadata being unavailable.
-     * We need to explicitly check if the record is full or linger time is met because the
accumulator's partition may not be ready
-     * if the leader is unavailable.
+     * A batch whose metadata is not available should be expired if one of the following
is true:
+     * <ol>
+     *     <li> the batch is not in retry AND request timeout has elapsed after it
is ready (full or linger.ms has reached).
+     *     <li> the batch is in retry AND request timeout has elapsed after the backoff
period ended.
+     * </ol>
      */
-    public boolean maybeExpire(int requestTimeout, long now, long lingerMs) {
+    public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long
lingerMs, boolean isFull) {
         boolean expire = false;
-        if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime))
|| requestTimeout < (now - (this.lastAttemptMs + lingerMs))) {
+
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
+            expire = true;
+        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs
+ lingerMs)))
             expire = true;
+        else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs
+ retryBackoffMs)))
+            expire = true;
+
+        if (expire) {
             this.records.close();
             this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing "
+ recordCount + " record(s) expired due to timeout while requesting metadata from brokers
for " + topicPartition));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45fe777/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 3660272..904aa73 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -297,22 +297,71 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testExpiredBatches() throws InterruptedException {
-        long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10, 100L, metrics, time);
+        long retryBackoffMs = 100L;
+        long lingerMs = 3000L;
+        int requestTimeout = 60;
+
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs, retryBackoffMs, metrics, time);
         int appends = 1024 / msgSize;
+
+        // Test batches not in retry
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
-            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         }
-        time.sleep(2000);
-        accum.ready(cluster, now);
+        // Make the batches ready due to batch full
         accum.append(tp1, 0L, key, value, null, 0);
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
readyNodes);
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(),
Collections.<String>emptySet());
-        now = time.milliseconds();
-        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
-        assertEquals(1, expiredBatches.size());
+        // Advance the clock to expire the batch.
+        time.sleep(requestTimeout + 1);
+        accum.mutePartition(tp1);
+        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout,
cluster, time.milliseconds());
+        assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+
+        accum.unmutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should be expired", 1, expiredBatches.size());
+        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+
+        // Advance the clock to make the next batch ready due to linger.ms
+        time.sleep(lingerMs);
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
readyNodes);
+        time.sleep(requestTimeout + 1);
+
+        accum.mutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should not be expired when metadata is still available and
partition is muted", 0, expiredBatches.size());
+
+        accum.unmutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size());
+        assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+
+        // Test batches in retry.
+        // Create a retried batch
+        accum.append(tp1, 0L, key, value, null, 0);
+        time.sleep(lingerMs);
+        readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node1),
readyNodes);
+        Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, readyNodes,
Integer.MAX_VALUE, time.milliseconds());
+        assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1);
+        time.sleep(1000L);
+        accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
+
+        // test expiration.
+        time.sleep(requestTimeout + retryBackoffMs);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should not be expired.", 0, expiredBatches.size());
+        time.sleep(1L);
+
+        accum.mutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+
+        accum.unmutePartition(tp1);
+        expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
+        assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size());
     }
 
     @Test


Mime
View raw message