kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5232; Fix Log.parseTopicPartitionName to handle deleted topics with a period in the name
Date Mon, 15 May 2017 08:17:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bc55f8523 -> f56bbb651


KAFKA-5232; Fix Log.parseTopicPartitionName to handle deleted topics with a period in the
name

This issue would only be triggered if a broker was restarted while
deletion was still taking place.

Included a few minor improvements to that method and its tests.

Author: Jaikiran Pai <jaikiran.pai@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3050 from jaikiran/KAFKA-5232-trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f56bbb65
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f56bbb65
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f56bbb65

Branch: refs/heads/trunk
Commit: f56bbb6510df8c12db3ad075e2f6c78dd0092d60
Parents: bc55f85
Author: Jaikiran Pai <jaikiran.pai@gmail.com>
Authored: Mon May 15 09:14:19 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 15 09:14:26 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 42 ++++++----
 core/src/main/scala/kafka/log/LogManager.scala  | 11 +--
 core/src/main/scala/kafka/log/LogSegment.scala  |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 82 +++++++++++++++++---
 4 files changed, 104 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7203033..f499aa8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import java.util.Map.{Entry => JEntry}
 import java.lang.{Long => JLong}
+import java.util.regex.Pattern
 
 import org.apache.kafka.common.internals.Topic
 
@@ -256,7 +257,7 @@ class Log(@volatile var dir: File,
       if (isIndexFile(file)) {
         // if it is an index file, make sure it has a corresponding .log file
         val offset = offsetFromFilename(filename)
-        val logFile = logFilename(dir, offset)
+        val logFile = Log.logFile(dir, offset)
         if (!logFile.exists) {
           warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
           Files.deleteIfExists(file.toPath)
@@ -1139,7 +1140,7 @@ class Log(@volatile var dir: File,
     val start = time.nanoseconds
     lock synchronized {
       val newOffset = math.max(expectedNextOffset, logEndOffset)
-      val logFile = logFilename(dir, newOffset)
+      val logFile = Log.logFile(dir, newOffset)
       val offsetIdxFile = offsetIndexFile(dir, newOffset)
       val timeIdxFile = timeIndexFile(dir, newOffset)
       val txnIdxFile = transactionIndexFile(dir, newOffset)
@@ -1494,6 +1495,8 @@ object Log {
   /** a directory that is scheduled to be deleted */
   val DeleteDirSuffix = "-delete"
 
+  private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
+
   val UnknownLogStartOffset = -1L
 
   /**
@@ -1517,10 +1520,19 @@ object Log {
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
-  def logFilename(dir: File, offset: Long) =
+  def logFile(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
 
   /**
+    * Return a directory name to rename the log directory to for async deletion. The name
will be in the following
+    * format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables.
+    */
+  def logDeleteDirName(logName: String): String = {
+    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
+    s"$logName.$uniqueId$DeleteDirSuffix"
+  }
+
+  /**
    * Construct an index file name in the given dir using the given base offset
    *
    * @param dir The directory in which the log will reside
@@ -1557,32 +1569,36 @@ object Log {
    * Parse the topic and partition out of the directory name of a log
    */
   def parseTopicPartitionName(dir: File): TopicPartition = {
+    if (dir == null)
+      throw new KafkaException("dir should not be null")
 
     def exception(dir: File): KafkaException = {
-      new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
-        "'" + dir.getName + "' is not in the form of topic-partition or " +
-        "ongoing-deleting directory(topic-partition.uniqueId-delete)\n" +
-        "If a directory does not contain Kafka topic data it should not exist in Kafka's
log " +
-        "directory")
+      new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not
in the form of " +
+        "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n"
+
+        "Kafka's log directories (and children) should only contain Kafka topic data.")
     }
 
     val dirName = dir.getName
     if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
       throw exception(dir)
-    if (dirName.endsWith(DeleteDirSuffix) && !dirName.matches("^(\\S+)-(\\S+)\\.(\\S+)"
+ DeleteDirSuffix))
+    if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches)
       throw exception(dir)
 
     val name: String =
-      if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.'))
+      if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
       else dirName
 
     val index = name.lastIndexOf('-')
     val topic = name.substring(0, index)
-    val partition = name.substring(index + 1)
-    if (topic.length < 1 || partition.length < 1)
+    val partitionString = name.substring(index + 1)
+    if (topic.isEmpty || partitionString.isEmpty)
       throw exception(dir)
 
-    new TopicPartition(topic, partition.toInt)
+    val partition =
+      try partitionString.toInt
+      catch { case _: NumberFormatException => throw exception(dir) }
+
+    new TopicPartition(topic, partition)
   }
 
   private def isIndexFile(file: File): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c621680..af771f1 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -461,20 +461,15 @@ class LogManager(val logDirs: Array[File],
     */
   def asyncDelete(topicPartition: TopicPartition) = {
     val removedLog: Log = logCreationOrDeletionLock synchronized {
-                            logs.remove(topicPartition)
-                          }
+      logs.remove(topicPartition)
+    }
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be deleted before
actually deleting it.
       if (cleaner != null) {
         cleaner.abortCleaning(topicPartition)
         cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
-      // renaming the directory to topic-partition.uniqueId-delete
-      val dirName = new StringBuilder(removedLog.name)
-                        .append(".")
-                        .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
-                        .append(Log.DeleteDirSuffix)
-                        .toString()
+      val dirName = Log.logDeleteDirName(removedLog.name)
       removedLog.close()
       val renamedDir = new File(removedLog.dir.getParent, dirName)
       val renameSuccessful = removedLog.dir.renameTo(renamedDir)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index d76b47a..6699143 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -73,7 +73,7 @@ class LogSegment(val log: FileRecords,
 
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs:
Long, time: Time,
            fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean
= false) =
-    this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize,
preallocate),
+    this(FileRecords.open(Log.logFile(dir, startOffset), fileAlreadyExists, initFileSize,
preallocate),
          new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset,
maxIndexSize = maxIndexSize),
          new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize
= maxIndexSize),
          new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f56bbb65/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2283077..aaef466 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -59,7 +59,7 @@ class LogTest {
 
   def createEmptyLogs(dir: File, offsets: Int*) {
     for(offset <- offsets) {
-      Log.logFilename(dir, offset).createNewFile()
+      Log.logFile(dir, offset).createNewFile()
       Log.offsetIndexFile(dir, offset).createNewFile()
     }
   }
@@ -68,7 +68,7 @@ class LogTest {
   def testOffsetFromFilename() {
     val offset = 23423423L
 
-    val logFile = Log.logFilename(tmpDir, offset)
+    val logFile = Log.logFile(tmpDir, offset)
     assertEquals(offset, Log.offsetFromFilename(logFile.getName))
 
     val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset)
@@ -1634,12 +1634,26 @@ class LogTest {
   def testParseTopicPartitionName() {
     val topic = "test_topic"
     val partition = "143"
-    val dir = new File(logDir + topicPartitionName(topic, partition))
+    val dir = new File(logDir, topicPartitionName(topic, partition))
     val topicPartition = Log.parseTopicPartitionName(dir)
     assertEquals(topic, topicPartition.topic)
     assertEquals(partition.toInt, topicPartition.partition)
   }
 
+  /**
+   * Tests that log directories with a period in their name that have been marked for deletion
+   * are parsed correctly by `Log.parseTopicPartitionName` (see KAFKA-5232 for details).
+   */
+  @Test
+  def testParseTopicPartitionNameWithPeriodForDeletedTopic() {
+    val topic = "foo.bar-testtopic"
+    val partition = "42"
+    val dir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic, partition)))
+    val topicPartition = Log.parseTopicPartitionName(dir)
+    assertEquals("Unexpected topic name parsed", topic, topicPartition.topic)
+    assertEquals("Unexpected partition number parsed", partition.toInt, topicPartition.partition)
+  }
+
   @Test
   def testParseTopicPartitionNameForEmptyName() {
     try {
@@ -1647,7 +1661,7 @@ class LogTest {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case _: Exception => // its GOOD!
+      case _: KafkaException => // its GOOD!
     }
   }
 
@@ -1658,7 +1672,7 @@ class LogTest {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir)
     } catch {
-      case _: Exception => // its GOOD!
+      case _: KafkaException => // its GOOD!
     }
   }
 
@@ -1666,12 +1680,20 @@ class LogTest {
   def testParseTopicPartitionNameForMissingSeparator() {
     val topic = "test_topic"
     val partition = "1999"
-    val dir = new File(logDir + File.separator + topic + partition)
+    val dir = new File(logDir, topic + partition)
     try {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case _: Exception => // its GOOD!
+      case _: KafkaException => // expected
+    }
+    // also test the "-delete" marker case
+    val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topic + partition))
+    try {
+      Log.parseTopicPartitionName(deleteMarkerDir)
+      fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // expected
     }
   }
 
@@ -1679,13 +1701,22 @@ class LogTest {
   def testParseTopicPartitionNameForMissingTopic() {
     val topic = ""
     val partition = "1999"
-    val dir = new File(logDir + topicPartitionName(topic, partition))
+    val dir = new File(logDir, topicPartitionName(topic, partition))
     try {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case _: Exception => // its GOOD!
+      case _: KafkaException => // expected
     }
+    // also test the "-delete" marker case
+    val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic,
partition)))
+    try {
+      Log.parseTopicPartitionName(deleteMarkerDir)
+      fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // expected
+    }
+
   }
 
   @Test
@@ -1697,7 +1728,36 @@ class LogTest {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case _: Exception => // its GOOD!
+      case _: KafkaException => // expected
+    }
+    // also test the "-delete" marker case
+    val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topicPartitionName(topic,
partition)))
+    try {
+      Log.parseTopicPartitionName(deleteMarkerDir)
+      fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // expected
+    }
+  }
+
+  @Test
+  def testParseTopicPartitionNameForInvalidPartition() {
+    val topic = "test_topic"
+    val partition = "1999a"
+    val dir = new File(logDir, topicPartitionName(topic, partition))
+    try {
+      Log.parseTopicPartitionName(dir)
+      fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // expected
+    }
+    // also test the "-delete" marker case
+    val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(topic + partition))
+    try {
+      Log.parseTopicPartitionName(deleteMarkerDir)
+      fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // expected
     }
   }
 
@@ -1720,7 +1780,7 @@ class LogTest {
   }
 
   def topicPartitionName(topic: String, partition: String): String =
-    File.separator + topic + "-" + partition
+    topic + "-" + partition
 
   @Test
   def testDeleteOldSegmentsMethod() {


Mime
View raw message