kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6854; Handle batches deleted during log cleaning of logs with txns (#4962)
Date Thu, 03 May 2018 20:09:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 962eef0  KAFKA-6854; Handle batches deleted during log cleaning of logs with txns
(#4962)
962eef0 is described below

commit 962eef014bc56e30a308933ac220afeffd52918b
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Thu May 3 21:05:36 2018 +0100

    KAFKA-6854; Handle batches deleted during log cleaning of logs with txns (#4962)
    
    Log cleaner grows buffers when result.messagesRead is zero. This contains the number of
filtered messages read from source which can be zero when transactions are used because batches
may be discarded. Log cleaner incorrectly assumes that messages were not read because the
buffer was too small and attempts to double the buffer size unnecessarily, failing with an
exception if the buffer is already max.message.bytes. Additional check for discarded batches
has been added to avoid growing [...]
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../apache/kafka/common/record/MemoryRecords.java  |  4 ++-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  5 ++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 33 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 932d4b6..e37fabe 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -145,7 +145,7 @@ public class MemoryRecords extends AbstractRecords {
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
         int messagesRead = 0;
-        int bytesRead = 0;
+        int bytesRead = 0; // bytes processed from `batches`
         int messagesRetained = 0;
         int bytesRetained = 0;
 
@@ -359,6 +359,8 @@ public class MemoryRecords extends AbstractRecords {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;
 
+        // Note that `bytesRead` should contain only bytes from batches that have been processed,
+        // i.e. bytes from `messagesRead` and any discarded batches.
         public FilterResult(ByteBuffer output,
                             int messagesRead,
                             int bytesRead,
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 658b853..f5a070d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -622,8 +622,9 @@ private[log] class Cleaner(val id: Int,
         throttler.maybeThrottle(outputBuffer.limit())
       }
 
-      // if we read bytes but didn't get even one complete message, our I/O buffer is too
small, grow it and try again
-      if (readBuffer.limit() > 0 && result.messagesRead == 0)
+      // if we read bytes but didn't get even one complete batch, our I/O buffer is too small,
grow it and try again
+      // `result.bytesRead` contains bytes from the `messagesRead` and any discarded batches.
+      if (readBuffer.limit() > 0 && result.bytesRead == 0)
         growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c12f617..ae949bf 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -300,6 +300,39 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
   }
 
+  /**
+   * Tests log cleaning with batches that are deleted where no additional messages
+   * are available to read in the buffer. Cleaning should continue from the next offset.
+   */
+  @Test
+  def testDeletedBatchesWithNoMessagesRead(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MaxMessageBytesProp, 100: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+
+    appendProducer(Seq(1))
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
+    appendProducer(Seq(2))
+    appendProducer(Seq(2))
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient
= false)
+    log.roll()
+
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(2), keysInLog(log))
+    assertEquals(List(1, 3, 4), offsetsInLog(log))
+
+    cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
+    assertEquals(List(2), keysInLog(log))
+    assertEquals(List(3, 4), offsetsInLog(log))
+  }
+
   @Test
   def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
     val tp = new TopicPartition("test", 0)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message