kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3802; log mtimes reset on broker restart / shutdown
Date Wed, 06 Jul 2016 15:13:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk dca263b4e -> 15e008783


KAFKA-3802; log mtimes reset on broker restart / shutdown

There seems to be a bug in the JDK that on some versions the mtime of
the file is modified on FileChannel.truncate() even if the javadoc states
`If the given size is greater than or equal to the file's current size then
 the file is not modified.`.

This causes problems with log retention, as all the files then look like
they contain recent data to Kafka. Therefore this is only done if the channel size is different
to the target size.

Author: Moritz Siuts <m.siuts@emetriq.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1497 from msiuts/KAFKA-3802-log_mtimes_reset_on_broker_shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/15e00878
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/15e00878
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/15e00878

Branch: refs/heads/trunk
Commit: 15e008783cf73dcaed851fe6cc587767031886e5
Parents: dca263b
Author: Moritz Siuts <m.siuts@emetriq.com>
Authored: Wed Jul 6 14:33:07 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jul 6 14:33:07 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   | 20 ++++--
 .../unit/kafka/log/FileMessageSetTest.scala     | 67 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/15e00878/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index a454f2c..c8bce65 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -54,12 +54,12 @@ class FileMessageSet private[kafka](@volatile var file: File,
     if(isSlice)
       new AtomicInteger(end - start) // don't check the file size if this is just a slice
view
     else
-      new AtomicInteger(math.min(channel.size().toInt, end) - start)
+      new AtomicInteger(math.min(channel.size.toInt, end) - start)
 
   /* if this is not a slice, update the file pointer to the end of the file */
   if (!isSlice)
     /* set the file position to the last byte in the file */
-    channel.position(math.min(channel.size().toInt, end))
+    channel.position(math.min(channel.size.toInt, end))
 
   /**
    * Create a file message set with no slicing.
@@ -157,7 +157,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
    */
   def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
     // Ensure that the underlying size has not changed.
-    val newSize = math.min(channel.size().toInt, end) - start
+    val newSize = math.min(channel.size.toInt, end) - start
     if (newSize < _size.get()) {
       throw new KafkaException("Size of FileMessageSet %s has been truncated during write:
old size %d, new size %d"
         .format(file.getAbsolutePath, _size.get(), newSize))
@@ -333,7 +333,11 @@ class FileMessageSet private[kafka](@volatile var file: File,
   /**
    * Truncate this file message set to the given size in bytes. Note that this API does no
checking that the
    * given size falls on a valid message boundary.
-   * @param targetSize The size to truncate to.
+   * In some versions of the JDK truncating to the same size as the file message set will
cause an
+   * update of the files mtime, so truncate is only performed if the targetSize is smaller
than the
+   * size of the underlying FileChannel.
+   * It is expected that no other threads will do writes to the log when this function is
called.
+   * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
    * @return The number of bytes truncated off
    */
   def truncateTo(targetSize: Int): Int = {
@@ -341,9 +345,11 @@ class FileMessageSet private[kafka](@volatile var file: File,
     if(targetSize > originalSize || targetSize < 0)
       throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes
failed, " +
                                " size of this log segment is " + originalSize + " bytes.")
-    channel.truncate(targetSize)
-    channel.position(targetSize)
-    _size.set(targetSize)
+    if (targetSize < channel.size.toInt) {
+      channel.truncate(targetSize)
+      channel.position(targetSize)
+      _size.set(targetSize)
+    }
     originalSize - targetSize
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/15e00878/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 417aa75..a64454d 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -19,12 +19,14 @@ package kafka.log
 
 import java.io._
 import java.nio._
-import java.util.concurrent.atomic._
+import java.nio.channels._
 
 import kafka.common.LongRef
 import org.junit.Assert._
 import kafka.utils.TestUtils._
 import kafka.message._
+import kafka.common.KafkaException
+import org.easymock.EasyMock
 import org.junit.Test
 
 class FileMessageSetTest extends BaseMessageSetTestCases {
@@ -153,6 +155,69 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   }
 
   /**
+    * Test that truncateTo only calls truncate on the FileChannel if the size of the
+    * FileChannel is bigger than the target size. This is important because some JVMs
+    * change the mtime of the file, even if truncate should do nothing.
+    */
+  @Test
+  def testTruncateNotCalledIfSizeIsSameAsTargetSize() {
+    val channelMock = EasyMock.createMock(classOf[FileChannel])
+
+    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
+    EasyMock.expect(channelMock.position(42L)).andReturn(null)
+    EasyMock.replay(channelMock)
+
+    val msgSet = new FileMessageSet(tempFile(), channelMock)
+    msgSet.truncateTo(42)
+
+    EasyMock.verify(channelMock)
+  }
+
+  /**
+    * Expect a KafkaException if targetSize is bigger than the size of
+    * the FileMessageSet.
+    */
+  @Test
+  def testTruncateNotCalledIfSizeIsBiggerThanTargetSize() {
+    val channelMock = EasyMock.createMock(classOf[FileChannel])
+
+    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
+    EasyMock.expect(channelMock.position(42L)).andReturn(null)
+    EasyMock.replay(channelMock)
+
+    val msgSet = new FileMessageSet(tempFile(), channelMock)
+
+    try {
+      msgSet.truncateTo(43)
+      fail("Should throw KafkaException")
+    } catch {
+      case e: KafkaException => // expected
+    }
+
+    EasyMock.verify(channelMock)
+  }
+
+  /**
+    * see #testTruncateNotCalledIfSizeIsSameAsTargetSize
+    */
+  @Test
+  def testTruncateIfSizeIsDifferentToTargetSize() {
+    val channelMock = EasyMock.createMock(classOf[FileChannel])
+
+    EasyMock.expect(channelMock.size).andReturn(42L).atLeastOnce()
+    EasyMock.expect(channelMock.position(42L)).andReturn(null).once()
+    EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once()
+    EasyMock.expect(channelMock.position(23L)).andReturn(null).once()
+    EasyMock.replay(channelMock)
+
+    val msgSet = new FileMessageSet(tempFile(), channelMock)
+    msgSet.truncateTo(23)
+
+    EasyMock.verify(channelMock)
+  }
+
+
+  /**
    * Test the new FileMessageSet with pre allocate as true
    */
   @Test


Mime
View raw message