kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3915; Don't convert messages from v0 to v1 during log compaction The conversion is unsafe as the converted message size may be greater than the message size limit. Updated `LogCleanerIntegrationTest` to test the max message size c [Forced Update!]
Date Wed, 20 Jul 2016 20:19:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 a5d2ff3d3 -> 039e89a6e (forced update)


KAFKA-3915; Don't convert messages from v0 to v1 during log compaction
The conversion is unsafe as the converted message size may be greater
than the message size limit. Updated `LogCleanerIntegrationTest` to test the max message size
case for both V0 and the current version.

Also include a few minor clean-ups:
* Remove unused code branch in `LogCleaner.compressMessages`
* Avoid unintentional usage of `scala.collection.immutable.Stream` (`toSeq` on an `Iterator`)
* Add explicit result type in `FileMessageSet.iterator`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #1643 from ijuma/kafka-3915-log-cleaner-io-buffers-message-conversion

(cherry picked from commit 0d19f58850282d16c14fd4abd04663eae817d012)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/039e89a6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/039e89a6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/039e89a6

Branch: refs/heads/0.10.0
Commit: 039e89a6ebcf914f06d83b7277bb7fc209547cc1
Parents: a7f9396
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Jul 20 12:49:29 2016 -0700
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jul 20 21:18:52 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   |   2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  31 +---
 .../kafka/log/LogCleanerIntegrationTest.scala   | 184 +++++++++++++------
 3 files changed, 142 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/039e89a6/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index d5aa5c5..2ee2cc2 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -239,7 +239,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
   /**
    * Get a shallow iterator over the messages in the set.
    */
-  override def iterator = iterator(Int.MaxValue)
+  override def iterator: Iterator[MessageAndOffset] = iterator(Int.MaxValue)
 
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.

http://git-wip-us.apache.org/repos/asf/kafka/blob/039e89a6/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c6636be..c5e4ee8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -431,8 +431,7 @@ private[log] class Cleaner(val id: Int,
         stats.readMessage(size)
         if (entry.message.compressionCodec == NoCompressionCodec) {
           if (shouldRetainMessage(source, map, retainDeletes, entry)) {
-            val convertedMessage = entry.message.toFormatVersion(messageFormatVersion)
-            ByteBufferMessageSet.writeMessage(writeBuffer, convertedMessage, entry.offset)
+            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
             stats.recopyMessage(size)
           }
           messagesRead += 1
@@ -444,22 +443,15 @@ private[log] class Cleaner(val id: Int,
           val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
           messages.foreach { messageAndOffset =>
             messagesRead += 1
-            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) {
-              retainedMessages += {
-                if (messageAndOffset.message.magic != messageFormatVersion) {
-                  writeOriginalMessageSet = false
-                  new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion),
messageAndOffset.offset)
-                }
-                else messageAndOffset
-              }
-            }
+            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset))
+              retainedMessages += messageAndOffset
             else writeOriginalMessageSet = false
           }
 
-          // There are no messages compacted out and no message format conversion, write
the original message set back
+          // There are no messages compacted out, write the original message set back
           if (writeOriginalMessageSet)
             ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
-          else if (retainedMessages.nonEmpty)
+          else
             compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion,
retainedMessages)
         }
       }
@@ -484,14 +476,9 @@ private[log] class Cleaner(val id: Int,
                                compressionCodec: CompressionCodec,
                                messageFormatVersion: Byte,
                                messageAndOffsets: Seq[MessageAndOffset]) {
-    val messages = messageAndOffsets.map(_.message)
-    if (messageAndOffsets.isEmpty) {
-      MessageSet.Empty.sizeInBytes
-    } else if (compressionCodec == NoCompressionCodec) {
-      for (messageOffset <- messageAndOffsets)
-        ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset)
-      MessageSet.messageSetSize(messages)
-    } else {
+    require(compressionCodec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
+    if (messageAndOffsets.nonEmpty) {
+      val messages = messageAndOffsets.map(_.message)
       val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
       val firstMessageOffset = messageAndOffsets.head
       val firstAbsoluteOffset = firstMessageOffset.offset
@@ -608,7 +595,7 @@ private[log] class Cleaner(val id: Int,
    */
   private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long
= {
     map.clear()
-    val dirty = log.logSegments(start, end).toSeq
+    val dirty = log.logSegments(start, end).toBuffer
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name,
dirty.size, start, end))
     
     // Add all the dirty segments. We must take at least map.slots * load_factor,

http://git-wip-us.apache.org/repos/asf/kafka/blob/039e89a6/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index cc9873c..825a55b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,6 +20,7 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
+import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
 import kafka.common.TopicAndPartition
 import kafka.message._
 import kafka.server.OffsetCheckpoint
@@ -33,6 +34,7 @@ import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 
 import scala.collection._
+import scala.util.Random
 
 /**
  * This is an integration test that tests the fully integrated log cleaner
@@ -40,117 +42,195 @@ import scala.collection._
 @RunWith(value = classOf[Parameterized])
 class LogCleanerIntegrationTest(compressionCodec: String) {
 
+  val codec = CompressionCodec.getCompressionCodec(compressionCodec)
   val time = new MockTime()
-  val segmentSize = 100
+  val segmentSize = 256
   val deleteDelay = 1000
   val logName = "log"
   val logDir = TestUtils.tempDir()
   var counter = 0
+  var cleaner: LogCleaner = _
   val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log",
2))
 
   @Test
   def cleanerTest() {
-    val cleaner = makeCleaner(parts = 3)
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey,
Message.MagicValue_V1)
+    val maxMessageSize = largeMessageSet.sizeInBytes
+
+    cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
     val log = cleaner.logs.get(topics(0))
 
-    val appends = writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
     val startSize = log.size
     cleaner.startup()
 
     val firstDirty = log.activeSegment.baseOffset
-    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty
ratio" is higher than LogConfig.MinCleanableDirtyRatioProp
-    cleaner.awaitCleaned("log", 0, firstDirty)
+    checkLastCleaned("log", 0, firstDirty)
     val compactedSize = log.logSegments.map(_.size).sum
-    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log",
0)).get
-    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
lastCleaned >= firstDirty)
-    assertTrue(s"log should have been compacted:  startSize=$startSize compactedSize=$compactedSize",
startSize > compactedSize)
-    
-    val read = readFromLog(log)
-    assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
-    assertTrue(startSize > log.size)
+    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize",
startSize > compactedSize)
 
-    // write some more stuff and validate again
-    val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
-    val firstDirty2 = log.activeSegment.baseOffset
-    cleaner.awaitCleaned("log", 0, firstDirty2)
+    checkLogAfterAppendingDups(log, startSize, appends)
 
-    val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log",
0)).get
-    assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2
>= firstDirty2);
+    log.append(largeMessageSet, assignOffsets = true)
+    val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log
= log, codec = codec)
+    val appends2 = appends ++ Seq(largeMessageKey -> largeMessageValue) ++ dups
+    val firstDirty2 = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty2)
 
-    val read2 = readFromLog(log)
-    assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
+    checkLogAfterAppendingDups(log, startSize, appends2)
 
     // simulate deleting a partition, by removing it from logs
     // force a checkpoint
     // and make sure its gone from checkpoint file
-
     cleaner.logs.remove(topics(0))
-
     cleaner.updateCheckpoints(logDir)
     val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
-
     // we expect partition 0 to be gone
-    assert(!checkpoints.contains(topics(0)))
-    cleaner.shutdown()
+    assertFalse(checkpoints.contains(topics(0)))
+  }
+
+  // returns (value, ByteBufferMessageSet)
+  private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String,
ByteBufferMessageSet) = {
+    def messageValue(length: Int): String = {
+      val random = new Random(0)
+      new String(random.alphanumeric.take(length).toArray)
+    }
+    val value = messageValue(128)
+    val messageSet = TestUtils.singleMessageSet(payload = value.getBytes, codec = codec,
key = key.toString.getBytes,
+      magicValue = messageFormatVersion)
+    (value, messageSet)
   }
 
-  def readFromLog(log: Log): Iterable[(Int, Int)] = {
-    for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
+  @Test
+  def testCleanerWithMessageFormatV0() {
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey,
Message.MagicValue_V0)
+    val maxMessageSize = codec match {
+      case NoCompressionCodec => largeMessageSet.sizeInBytes
+      case _ =>
+        // the broker assigns absolute offsets for message format 0 which potentially causes
the compressed size to
+        // increase because the broker offsets are larger than the ones assigned by the client
+        // adding `5` to the message set size is good enough for this test: it covers the
increased message size while
+        // still being less than the overhead introduced by the conversion from message format
version 0 to 1
+        largeMessageSet.sizeInBytes + 5
+    }
+
+    cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
+
+    val log = cleaner.logs.get(topics(0))
+    val props = logConfigProperties(maxMessageSize)
+    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
+    log.config = new LogConfig(props)
+
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue
= Message.MagicValue_V0)
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize",
startSize > compactedSize)
+
+    checkLogAfterAppendingDups(log, startSize, appends)
+
+    val appends2: Seq[(Int, String)] = {
+      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue
= Message.MagicValue_V0)
+      log.append(largeMessageSet, assignOffsets = true)
+
+      // also add some messages with version 1 to check that we handle mixed format versions
correctly
+      props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
+      log.config = new LogConfig(props)
+      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec =
codec, magicValue = Message.MagicValue_V1)
+      appends ++ dupsV0 ++ Seq(largeMessageKey -> largeMessageValue) ++ dupsV1
+    }
+    val firstDirty2 = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty2)
+
+    checkLogAfterAppendingDups(log, startSize, appends2)
+  }
+
+  private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) {
+    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty
ratio" is higher than
+    // LogConfig.MinCleanableDirtyRatioProp
+    cleaner.awaitCleaned(topic, partitionId, firstDirty)
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition(topic,
partitionId)).get
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
+      lastCleaned >= firstDirty)
+  }
+
+  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String)])
{
+    val read = readFromLog(log)
+    assertEquals("Contents of the map shouldn't change", appends.toMap, read.toMap)
+    assertTrue(startSize > log.size)
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, String)] = {
+
+    def messageIterator(entry: MessageAndOffset): Iterator[MessageAndOffset] =
       // create single message iterator or deep iterator depending on compression codec
-      if (entry.message.compressionCodec == NoCompressionCodec)
-        Stream.cons(entry, Stream.empty).iterator
-      else
-        ByteBufferMessageSet.deepIterator(entry)
-    }) yield {
+      if (entry.message.compressionCodec == NoCompressionCodec) Iterator(entry)
+      else ByteBufferMessageSet.deepIterator(entry)
+
+    for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- messageIterator(entry))
yield {
       val key = TestUtils.readString(messageAndOffset.message.key).toInt
-      val value = TestUtils.readString(messageAndOffset.message.payload).toInt
+      val value = TestUtils.readString(messageAndOffset.message.payload)
       key -> value
     }
   }
 
-  def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec): Seq[(Int,
Int)] = {
-    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
-      val count = counter
-      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec =
codec, key = key.toString.getBytes), assignOffsets = true)
+  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
+                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue):
Seq[(Int, String)] = {
+    for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
+      val payload = counter.toString
+      log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec =
codec,
+        key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true)
       counter += 1
-      (key, count)
+      (key, payload)
     }
   }
     
   @After
-  def teardown() {
+  def tearDown() {
+    cleaner.shutdown()
     time.scheduler.shutdown()
     Utils.delete(logDir)
   }
+
+  private def logConfigProperties(maxMessageSize: Int, minCleanableDirtyRatio: Float = 0.0F):
Properties = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
+    props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    props.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
+    props.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
+    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+    props
+  }
   
   /* create a cleaner instance and logs with the given parameters */
-  def makeCleaner(parts: Int, 
-                  minCleanableDirtyRatio: Float = 0.0F,
-                  numThreads: Int = 1,
-                  defaultPolicy: String = "compact",
-                  policyOverrides: Map[String, String] = Map()): LogCleaner = {
+  private def makeCleaner(parts: Int,
+                          minCleanableDirtyRatio: Float = 0.0F,
+                          numThreads: Int = 1,
+                          maxMessageSize: Int = 128,
+                          defaultPolicy: String = "compact",
+                          policyOverrides: Map[String, String] = Map()): LogCleaner = {
     
     // create partitions and add them to the pool
     val logs = new Pool[TopicAndPartition, Log]()
     for(i <- 0 until parts) {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
-      val logProps = new Properties()
-      logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-      logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
-      logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
-      logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-      logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
 
       val log = new Log(dir = dir,
-                        LogConfig(logProps),
+                        LogConfig(logConfigProperties(maxMessageSize, minCleanableDirtyRatio)),
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)
       logs.put(TopicAndPartition("log", i), log)      
     }
   
-    new LogCleaner(CleanerConfig(numThreads = numThreads),
+    new LogCleaner(CleanerConfig(numThreads = numThreads, ioBufferSize = maxMessageSize /
2, maxMessageSize = maxMessageSize),
                    logDirs = Array(logDir),
                    logs = logs,
                    time = time)


Mime
View raw message