kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4557; Handle Producer.send correctly in expiry callbacks
Date Fri, 27 Jan 2017 23:27:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 b81f311bc -> 05b1ebd9e


KAFKA-4557; Handle Producer.send correctly in expiry callbacks

When iterating deque for expiring record batches, delay the
invocation of the callback and deallocation until iteration is
complete since callbacks invoked during expiry may send more
records, modifying the deque, resulting in a
ConcurrentModificationException in the iterator.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2449 from rajinisivaram/KAFKA-4557

(cherry picked from commit aef6927a42cde193dd6acc36cd4d9f32167da622)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05b1ebd9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05b1ebd9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05b1ebd9

Branch: refs/heads/0.10.2
Commit: 05b1ebd9ea6ca36a77269c71d598a7f613b277ba
Parents: b81f311
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Fri Jan 27 23:26:09 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jan 27 23:27:42 2017 +0000

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   | 13 ++++--
 .../clients/producer/internals/RecordBatch.java | 48 ++++++++++++--------
 .../internals/RecordAccumulatorTest.java        | 45 ++++++++++++++++++
 3 files changed, 85 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05b1ebd9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 06d39ec..d3ae89e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -243,12 +243,14 @@ public final class RecordAccumulator {
                     while (batchIterator.hasNext()) {
                         RecordBatch batch = batchIterator.next();
                         boolean isFull = batch != lastBatch || batch.isFull();
-                        // check if the batch is expired
+                        // Check if the batch has expired. Expired batches are closed by
maybeExpire, but callbacks
+                        // are invoked after completing the iterations, since sends invoked
from callbacks
+                        // may append more batches to the deque being iterated. The batch
is deallocated after
+                        // callbacks are invoked.
                         if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs,
isFull)) {
                             expiredBatches.add(batch);
                             count++;
                             batchIterator.remove();
-                            deallocate(batch);
                         } else {
                             // Stop at the first batch that has not expired.
                             break;
@@ -257,8 +259,13 @@ public final class RecordAccumulator {
                 }
             }
         }
-        if (!expiredBatches.isEmpty())
+        if (!expiredBatches.isEmpty()) {
             log.trace("Expired {} batches in accumulator", count);
+            for (RecordBatch batch : expiredBatches) {
+                batch.expirationDone();
+                deallocate(batch);
+            }
+        }
 
         return expiredBatches;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05b1ebd9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index c8eddd5..6346fe9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A batch of records that is or will be sent.
@@ -48,6 +49,8 @@ public final class RecordBatch {
     long drainedMs;
     long lastAttemptMs;
     long lastAppendTime;
+    private String expiryErrorMessage;
+    private AtomicBoolean completed;
     private boolean retry;
 
     public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now)
{
@@ -57,6 +60,7 @@ public final class RecordBatch {
         this.topicPartition = tp;
         this.lastAppendTime = createdMs;
         this.produceFuture = new ProduceRequestResult(topicPartition);
+        this.completed = new AtomicBoolean();
     }
 
     /**
@@ -93,6 +97,9 @@ public final class RecordBatch {
         log.trace("Produced messages to topic-partition {} with base offset offset {} and
error: {}.",
                   topicPartition, baseOffset, exception);
 
+        if (completed.getAndSet(true))
+            throw new IllegalStateException("Batch has already been completed");
+
         // Set the future before invoking the callbacks as we rely on its state for the `onCompletion`
call
         produceFuture.set(baseOffset, logAppendTime, exception);
 
@@ -137,29 +144,34 @@ public final class RecordBatch {
      *     <li> the batch is not in retry AND request timeout has elapsed after it
is ready (full or linger.ms has reached).
      *     <li> the batch is in retry AND request timeout has elapsed after the backoff
period ended.
      * </ol>
+     * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has
timed out.
+     * {@link #expirationDone()} must be invoked to complete the produce future and invoke
callbacks.
      */
     public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long
lingerMs, boolean isFull) {
-        boolean expire = false;
-        String errorMessage = null;
-
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
{
-            expire = true;
-            errorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
-        } else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs
+ lingerMs))) {
-            expire = true;
-            errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch
creation plus linger time";
-        } else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs
+ retryBackoffMs))) {
-            expire = true;
-            errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed
since last attempt plus backoff time";
-        }
 
-        if (expire) {
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
+            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last
append";
+        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs
+ lingerMs)))
+            expiryErrorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since
batch creation plus linger time";
+        else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs
+ retryBackoffMs)))
+            expiryErrorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has
passed since last attempt plus backoff time";
+
+        boolean expired = expiryErrorMessage != null;
+        if (expired)
             close();
-            this.done(-1L, Record.NO_TIMESTAMP,
-                      new TimeoutException("Expiring " + recordCount + " record(s) for "
+ topicPartition + " due to " + errorMessage));
-        }
+        return expired;
+    }
 
-        return expire;
+    /**
+     * Completes the produce future with timeout exception and invokes callbacks.
+     * This method should be invoked only if {@link #maybeExpire(int, long, long, long, boolean)}
+     * returned true.
+     */
+    void expirationDone() {
+        if (expiryErrorMessage == null)
+            throw new IllegalStateException("Batch has not expired");
+        this.done(-1L, Record.NO_TIMESTAMP,
+                  new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition
+ ": " + expiryErrorMessage));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/05b1ebd9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 04e1411..f8bb1e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -18,6 +18,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.LogEntry;
@@ -38,10 +39,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -410,6 +413,48 @@ public class RecordAccumulatorTest {
     }
 
     @Test
+    public void testAppendInExpiryCallback() throws InterruptedException {
+        long retryBackoffMs = 100L;
+        long lingerMs = 3000L;
+        int requestTimeout = 60;
+        int messagesPerBatch = 1024 / msgSize;
+
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
lingerMs,
+                retryBackoffMs, metrics, time);
+        final AtomicInteger expiryCallbackCount = new AtomicInteger();
+        final AtomicReference<Exception> unexpectedException = new AtomicReference<Exception>();
+        Callback callback = new Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                if (exception instanceof TimeoutException) {
+                    expiryCallbackCount.incrementAndGet();
+                    try {
+                        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Unexpected interruption", e);
+                    }
+                } else if (exception != null)
+                    unexpectedException.compareAndSet(null, exception);
+            }
+        };
+
+        for (int i = 0; i < messagesPerBatch + 1; i++)
+            accum.append(tp1, 0L, key, value, callback, maxBlockTimeMs);
+
+        assertEquals(2, accum.batches().get(tp1).size());
+        assertTrue("First batch not full", accum.batches().get(tp1).peekFirst().isFull());
+
+        // Advance the clock to expire the first batch.
+        time.sleep(requestTimeout + 1);
+        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout,
time.milliseconds());
+        assertEquals("The batch was not expired", 1, expiredBatches.size());
+        assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get());
+        assertNull("Unexpected exception", unexpectedException.get());
+        assertEquals("Some messages not appended from expiry callbacks", 2, accum.batches().get(tp1).size());
+        assertTrue("First batch not full after expiry callbacks with appends", accum.batches().get(tp1).peekFirst().isFull());
+    }
+
+    @Test
     public void testMutedPartitions() throws InterruptedException {
         long now = time.milliseconds();
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE,
10, 100L, metrics, time);


Mime
View raw message