kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list (#5491)
Date Mon, 20 Aug 2018 18:03:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 0b81ef4  KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments
which have been removed from segments list (#5491)
0b81ef4 is described below

commit 0b81ef4a2d7dafb699c09687b0d15e9547ea0472
Author: Dong Lin <lindong28@gmail.com>
AuthorDate: Mon Aug 20 11:00:57 2018 -0700

    KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which
have been removed from segments list (#5491)
    
    Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala            | 13 ++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 69 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e4be8fc..afe151d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1740,7 +1740,9 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
+   * Perform an asynchronous delete on the given file.
+   *
+   * This method assumes that the file exists and the method is not thread-safe.
    *
    * This method does not need to convert IOException (thrown from changeFileSuffixes) to
KafkaStorageException because
    * it is either called before all logs are loaded or the caller will catch and handle IOException
@@ -1791,10 +1793,13 @@ class Log(@volatile var dir: File,
    * @param isRecoveredSwapFile true if the new segment was created from a swap file during
recovery after a crash
    */
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment],
isRecoveredSwapFile: Boolean = false) {
-    val sortedNewSegments = newSegments.sortBy(_.baseOffset)
-    val sortedOldSegments = oldSegments.sortBy(_.baseOffset)
-
     lock synchronized {
+      val sortedNewSegments = newSegments.sortBy(_.baseOffset)
+      // Some old segments may have been removed from index and scheduled for async deletion
after the caller reads segments
+      // but before this method is executed. We want to filter out those segments to avoid
calling asyncDeleteSegment()
+      // multiple times for the same segment.
+      val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset)
+
       checkIfMemoryMappedBufferClosed()
       // need to do this in two phases to be crash safe AND do the delete asynchronously
       // if we crash in the middle of this we complete the swap in loadSegments()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index b351311..0240707 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -21,6 +21,7 @@ import java.io.{File, RandomAccessFile}
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
@@ -90,6 +91,74 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
+    val deleteStartLatch = new CountDownLatch(1)
+    val deleteCompleteLatch = new CountDownLatch(1)
+
+    // Construct a log instance. The replaceSegments() method of the log instance is overridden
so that
+    // it waits for another thread to execute deleteOldSegments()
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
+    val topicPartition = Log.parseTopicPartitionName(dir)
+    val producerStateManager = new ProducerStateManager(topicPartition, dir)
+    val log = new Log(dir,
+                      config = LogConfig.fromProps(logConfig.originals, logProps),
+                      logStartOffset = 0L,
+                      recoveryPoint = 0L,
+                      scheduler = time.scheduler,
+                      brokerTopicStats = new BrokerTopicStats, time,
+                      maxProducerIdExpirationMs = 60 * 60 * 1000,
+                      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+                      topicPartition = topicPartition,
+                      producerStateManager = producerStateManager,
+                      logDirFailureChannel = new LogDirFailureChannel(10)) {
+      override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment],
isRecoveredSwapFile: Boolean = false): Unit = {
+        deleteStartLatch.countDown()
+        if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
+          throw new IllegalStateException("Log segment deletion timed out")
+        }
+        super.replaceSegments(newSegments, oldSegments, isRecoveredSwapFile)
+      }
+    }
+
+    // Start a thread which execute log.deleteOldSegments() right before replaceSegments()
is executed
+    val t = new Thread() {
+      override def run(): Unit = {
+        deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
+        log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset)
+        log.onHighWatermarkIncremented(log.activeSegment.baseOffset)
+        log.deleteOldSegments()
+        deleteCompleteLatch.countDown()
+      }
+    }
+    t.start()
+
+    // Append records so that segment number increase to 3
+    while (log.numberOfSegments < 3) {
+      log.appendAsLeader(record(key = 0, log.logEndOffset.toInt), leaderEpoch = 0)
+      log.roll()
+    }
+    assertEquals(3, log.numberOfSegments)
+
+    // Remember reference to the first log and determine its file name expected for async
deletion
+    val firstLogFile = log.logSegments.head.log
+    val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, "", Log.DeletedFileSuffix)
+
+    // Clean the log. This should trigger replaceSegments() and deleteOldSegments();
+    val offsetMap = new FakeOffsetMap(Int.MaxValue)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
+    val stats = new CleanerStats()
+    cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
+    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats)
+
+    // Validate based on the file name that log segment file is renamed exactly once for
async deletion
+    assertEquals(expectedFileName, firstLogFile.file().getPath)
+    assertEquals(2, log.numberOfSegments)
+  }
+
+  @Test
   def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = {
     val originalMaxFileSize = 1024;
     val cleaner = makeCleaner(2)


Mime
View raw message