From commits-return-10223-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Aug 20 20:03:19 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 84701180663 for ; Mon, 20 Aug 2018 20:03:18 +0200 (CEST) Received: (qmail 94989 invoked by uid 500); 20 Aug 2018 18:03:17 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 94980 invoked by uid 99); 20 Aug 2018 18:03:17 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Aug 2018 18:03:17 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D19C685385; Mon, 20 Aug 2018 18:03:16 +0000 (UTC) Date: Mon, 20 Aug 2018 18:03:16 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.0 updated: KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list (#5491) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153478819568.15746.7320343276044199091@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.0 X-Git-Reftype: branch X-Git-Oldrev: 9e83675d343721e28d9fc0b73468fad29fdf94c0 X-Git-Newrev: 0b81ef4a2d7dafb699c09687b0d15e9547ea0472 X-Git-Rev: 0b81ef4a2d7dafb699c09687b0d15e9547ea0472 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 0b81ef4 KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list (#5491) 0b81ef4 is described below commit 0b81ef4a2d7dafb699c09687b0d15e9547ea0472 Author: Dong Lin AuthorDate: Mon Aug 20 11:00:57 2018 -0700 KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list (#5491) Reviewers: Dhruvil Shah , Jason Gustafson --- core/src/main/scala/kafka/log/Log.scala | 13 ++-- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 69 ++++++++++++++++++++++ 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e4be8fc..afe151d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1740,7 +1740,9 @@ class Log(@volatile var dir: File, } /** - * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) + * Perform an asynchronous delete on the given file. + * + * This method assumes that the file exists and the method is not thread-safe. * * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because * it is either called before all logs are loaded or the caller will catch and handle IOException @@ -1791,10 +1793,13 @@ class Log(@volatile var dir: File, * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash */ private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) { - val sortedNewSegments = newSegments.sortBy(_.baseOffset) - val sortedOldSegments = oldSegments.sortBy(_.baseOffset) - lock synchronized { + val sortedNewSegments = newSegments.sortBy(_.baseOffset) + // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments + // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() + // multiple times for the same segment. + val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset) + checkIfMemoryMappedBufferClosed() // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index b351311..0240707 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -21,6 +21,7 @@ import java.io.{File, RandomAccessFile} import java.nio._ import java.nio.file.Paths import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.common._ import kafka.server.{BrokerTopicStats, LogDirFailureChannel} @@ -90,6 +91,74 @@ class LogCleanerTest extends JUnitSuite { } @Test + def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = { + val deleteStartLatch = new CountDownLatch(1) + val deleteCompleteLatch = new CountDownLatch(1) + + // Construct a log instance. The replaceSegments() method of the log instance is overridden so that + // it waits for another thread to execute deleteOldSegments() + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete) + val topicPartition = Log.parseTopicPartitionName(dir) + val producerStateManager = new ProducerStateManager(topicPartition, dir) + val log = new Log(dir, + config = LogConfig.fromProps(logConfig.originals, logProps), + logStartOffset = 0L, + recoveryPoint = 0L, + scheduler = time.scheduler, + brokerTopicStats = new BrokerTopicStats, time, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition = topicPartition, + producerStateManager = producerStateManager, + logDirFailureChannel = new LogDirFailureChannel(10)) { + override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = { + deleteStartLatch.countDown() + if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Log segment deletion timed out") + } + super.replaceSegments(newSegments, oldSegments, isRecoveredSwapFile) + } + } + + // Start a thread which execute log.deleteOldSegments() right before replaceSegments() is executed + val t = new Thread() { + override def run(): Unit = { + deleteStartLatch.await(5000, TimeUnit.MILLISECONDS) + log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset) + log.onHighWatermarkIncremented(log.activeSegment.baseOffset) + log.deleteOldSegments() + deleteCompleteLatch.countDown() + } + } + t.start() + + // Append records so that segment number increase to 3 + while (log.numberOfSegments < 3) { + log.appendAsLeader(record(key = 0, log.logEndOffset.toInt), leaderEpoch = 0) + log.roll() + } + assertEquals(3, log.numberOfSegments) + + // Remember reference to the first log and determine its file name expected for async deletion + val firstLogFile = log.logSegments.head.log + val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, "", Log.DeletedFileSuffix) + + // Clean the log. This should trigger replaceSegments() and deleteOldSegments(); + val offsetMap = new FakeOffsetMap(Int.MaxValue) + val cleaner = makeCleaner(Int.MaxValue) + val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq + val stats = new CleanerStats() + cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats) + cleaner.cleanSegments(log, segments, offsetMap, 0L, stats) + + // Validate based on the file name that log segment file is renamed exactly once for async deletion + assertEquals(expectedFileName, firstLogFile.file().getPath) + assertEquals(2, log.numberOfSegments) + } + + @Test def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = { val originalMaxFileSize = 1024; val cleaner = makeCleaner(2)