Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F3E51200C67 for ; Mon, 15 May 2017 10:17:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F26B9160BC2; Mon, 15 May 2017 08:17:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EB016160BC1 for ; Mon, 15 May 2017 10:17:43 +0200 (CEST) Received: (qmail 54186 invoked by uid 500); 15 May 2017 08:17:43 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 54177 invoked by uid 99); 15 May 2017 08:17:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 May 2017 08:17:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EEC01DFFAC; Mon, 15 May 2017 08:17:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <3711bc8199ea4376b4b2529157720ca5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5232; Fix Log.parseTopicPartitionName to handle deleted topics with a period in the name Date: Mon, 15 May 2017 08:17:42 +0000 (UTC) archived-at: Mon, 15 May 2017 08:17:45 -0000 Repository: kafka Updated Branches: refs/heads/trunk bc55f8523 -> f56bbb651 KAFKA-5232; Fix Log.parseTopicPartitionName to handle deleted topics with a period in the name This issue would only be triggered if a broker was restarted while deletion was still taking place. Included a few minor improvements to that method and its tests. Author: Jaikiran Pai Author: Ismael Juma Reviewers: Ismael Juma Closes #3050 from jaikiran/KAFKA-5232-trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f56bbb65 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f56bbb65 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f56bbb65 Branch: refs/heads/trunk Commit: f56bbb6510df8c12db3ad075e2f6c78dd0092d60 Parents: bc55f85 Author: Jaikiran Pai Authored: Mon May 15 09:14:19 2017 +0100 Committer: Ismael Juma Committed: Mon May 15 09:14:26 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 42 ++++++---- core/src/main/scala/kafka/log/LogManager.scala | 11 +-- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 82 +++++++++++++++++--- 4 files changed, 104 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7203033..f499aa8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import java.util.Map.{Entry => JEntry} import java.lang.{Long => JLong} +import java.util.regex.Pattern import org.apache.kafka.common.internals.Topic @@ -256,7 +257,7 @@ class Log(@volatile var dir: File, if (isIndexFile(file)) { // if it is an index file, make sure it has a corresponding .log file val offset = offsetFromFilename(filename) - val logFile = logFilename(dir, offset) + val logFile = Log.logFile(dir, offset) if (!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) Files.deleteIfExists(file.toPath) @@ -1139,7 +1140,7 @@ class Log(@volatile var dir: File, val start = time.nanoseconds lock synchronized { val newOffset = math.max(expectedNextOffset, logEndOffset) - val logFile = logFilename(dir, newOffset) + val logFile = Log.logFile(dir, newOffset) val offsetIdxFile = offsetIndexFile(dir, newOffset) val timeIdxFile = timeIndexFile(dir, newOffset) val txnIdxFile = transactionIndexFile(dir, newOffset) @@ -1494,6 +1495,8 @@ object Log { /** a directory that is scheduled to be deleted */ val DeleteDirSuffix = "-delete" + private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") + val UnknownLogStartOffset = -1L /** @@ -1517,10 +1520,19 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def logFilename(dir: File, offset: Long) = + def logFile(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) /** + * Return a directory name to rename the log directory to for async deletion. The name will be in the following + * format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables. + */ + def logDeleteDirName(logName: String): String = { + val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") + s"$logName.$uniqueId$DeleteDirSuffix" + } + + /** * Construct an index file name in the given dir using the given base offset * * @param dir The directory in which the log will reside @@ -1557,32 +1569,36 @@ object Log { * Parse the topic and partition out of the directory name of a log */ def parseTopicPartitionName(dir: File): TopicPartition = { + if (dir == null) + throw new KafkaException("dir should not be null") def exception(dir: File): KafkaException = { - new KafkaException("Found directory " + dir.getCanonicalPath + ", " + - "'" + dir.getName + "' is not in the form of topic-partition or " + - "ongoing-deleting directory(topic-partition.uniqueId-delete)\n" + - "If a directory does not contain Kafka topic data it should not exist in Kafka's log " + - "directory") + new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " + + "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" + + "Kafka's log directories (and children) should only contain Kafka topic data.") } val dirName = dir.getName if (dirName == null || dirName.isEmpty || !dirName.contains('-')) throw exception(dir) - if (dirName.endsWith(DeleteDirSuffix) && !dirName.matches("^(\\S+)-(\\S+)\\.(\\S+)" + DeleteDirSuffix)) + if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches) throw exception(dir) val name: String = - if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.')) + if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.')) else dirName val index = name.lastIndexOf('-') val topic = name.substring(0, index) - val partition = name.substring(index + 1) - if (topic.length < 1 || partition.length < 1) + val partitionString = name.substring(index + 1) + if (topic.isEmpty || partitionString.isEmpty) throw exception(dir) - new TopicPartition(topic, partition.toInt) + val partition = + try partitionString.toInt + catch { case _: NumberFormatException => throw exception(dir) } + + new TopicPartition(topic, partition) } private def isIndexFile(file: File): Boolean = { http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index c621680..af771f1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -461,20 +461,15 @@ class LogManager(val logDirs: Array[File], */ def asyncDelete(topicPartition: TopicPartition) = { val removedLog: Log = logCreationOrDeletionLock synchronized { - logs.remove(topicPartition) - } + logs.remove(topicPartition) + } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null) { cleaner.abortCleaning(topicPartition) cleaner.updateCheckpoints(removedLog.dir.getParentFile) } - // renaming the directory to topic-partition.uniqueId-delete - val dirName = new StringBuilder(removedLog.name) - .append(".") - .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) - .append(Log.DeleteDirSuffix) - .toString() + val dirName = Log.logDeleteDirName(removedLog.name) removedLog.close() val renamedDir = new File(removedLog.dir.getParent, dirName) val renameSuccessful = removedLog.dir.renameTo(renamedDir) http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d76b47a..6699143 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -73,7 +73,7 @@ class LogSegment(val log: FileRecords, def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = - this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate), + this(FileRecords.open(Log.logFile(dir, startOffset), fileAlreadyExists, initFileSize, preallocate), new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)), http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2283077..aaef466 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -59,7 +59,7 @@ class LogTest { def createEmptyLogs(dir: File, offsets: Int*) { for(offset <- offsets) { - Log.logFilename(dir, offset).createNewFile() + Log.logFile(dir, offset).createNewFile() Log.offsetIndexFile(dir, offset).createNewFile() } } @@ -68,7 +68,7 @@ class LogTest { def testOffsetFromFilename() { val offset = 23423423L - val logFile = Log.logFilename(tmpDir, offset) + val logFile = Log.logFile(tmpDir, offset) assertEquals(offset, Log.offsetFromFilename(logFile.getName)) val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset) @@ -1634,12 +1634,26 @@ class LogTest { def testParseTopicPartitionName() { val topic = "test_topic" val partition = "143" - val dir = new File(logDir + topicPartitionName(topic, partition)) + val dir = new File(logDir, topicPartitionName(topic, partition)) val topicPartition = Log.parseTopicPartitionName(dir) assertEquals(topic, topicPartition.topic) assertEquals(partition.toInt, topicPartition.partition) } + /** + * Tests that log directories with a period in their name that have been marked for deletion + * are parsed correctly by `Log.parseTopicPartitionName` (see KAFKA-5232 for details). + */ + @Test + def testParseTopicPartitionNameWithPeriodForDeletedTopic() { + val topic = "foo.bar-testtopic" + val partition = "42" + val dir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition))) + val topicPartition = Log.parseTopicPartitionName(dir) + assertEquals("Unexpected topic name parsed", topic, topicPartition.topic) + assertEquals("Unexpected partition number parsed", partition.toInt, topicPartition.partition) + } + @Test def testParseTopicPartitionNameForEmptyName() { try { @@ -1647,7 +1661,7 @@ class LogTest { Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { - case _: Exception => // its GOOD! + case _: KafkaException => // its GOOD! } } @@ -1658,7 +1672,7 @@ class LogTest { Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir) } catch { - case _: Exception => // its GOOD! + case _: KafkaException => // its GOOD! } } @@ -1666,12 +1680,20 @@ class LogTest { def testParseTopicPartitionNameForMissingSeparator() { val topic = "test_topic" val partition = "1999" - val dir = new File(logDir + File.separator + topic + partition) + val dir = new File(logDir, topic + partition) try { Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { - case _: Exception => // its GOOD! + case _: KafkaException => // expected + } + // also test the "-delete" marker case + val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topic + partition)) + try { + Log.parseTopicPartitionName(deleteMarkerDir) + fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) + } catch { + case _: KafkaException => // expected } } @@ -1679,13 +1701,22 @@ class LogTest { def testParseTopicPartitionNameForMissingTopic() { val topic = "" val partition = "1999" - val dir = new File(logDir + topicPartitionName(topic, partition)) + val dir = new File(logDir, topicPartitionName(topic, partition)) try { Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { - case _: Exception => // its GOOD! + case _: KafkaException => // expected } + // also test the "-delete" marker case + val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition))) + try { + Log.parseTopicPartitionName(deleteMarkerDir) + fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) + } catch { + case _: KafkaException => // expected + } + } @Test @@ -1697,7 +1728,36 @@ class LogTest { Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { - case _: Exception => // its GOOD! + case _: KafkaException => // expected + } + // also test the "-delete" marker case + val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition))) + try { + Log.parseTopicPartitionName(deleteMarkerDir) + fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) + } catch { + case _: KafkaException => // expected + } + } + + @Test + def testParseTopicPartitionNameForInvalidPartition() { + val topic = "test_topic" + val partition = "1999a" + val dir = new File(logDir, topicPartitionName(topic, partition)) + try { + Log.parseTopicPartitionName(dir) + fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) + } catch { + case _: KafkaException => // expected + } + // also test the "-delete" marker case + val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topic + partition)) + try { + Log.parseTopicPartitionName(deleteMarkerDir) + fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) + } catch { + case _: KafkaException => // expected } } @@ -1720,7 +1780,7 @@ class LogTest { } def topicPartitionName(topic: String, partition: String): String = - File.separator + topic + "-" + partition + topic + "-" + partition @Test def testDeleteOldSegmentsMethod() {