kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5752; Update index files correctly during async delete
Date Tue, 22 Aug 2017 14:12:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6cf92a253 -> 3615ef730


KAFKA-5752; Update index files correctly during async delete

timeIndex and txnIndex were not being updated previously.

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3700 from omkreddy/KAFKA-5752


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3615ef73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3615ef73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3615ef73

Branch: refs/heads/trunk
Commit: 3615ef730caeab3f79874c8e2acb9f7e0e9cff82
Parents: 6cf92a2
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Tue Aug 22 15:08:00 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Aug 22 15:11:04 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogManager.scala  | 15 +++-----
 core/src/main/scala/kafka/log/LogSegment.scala  | 11 ++++++
 .../scala/unit/kafka/log/LogManagerTest.scala   | 39 +++++++++++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala |  4 +-
 4 files changed, 53 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3615ef73/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 88a0e21..4068001 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -74,7 +74,7 @@ class LogManager(logDirs: Array[File],
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs,
initialOfflineDirs)
 
   def liveLogDirs: Array[File] = {
-    if (_liveLogDirs.size() == logDirs.size)
+    if (_liveLogDirs.size == logDirs.size)
       logDirs
     else
       _liveLogDirs.asScala.toArray
@@ -416,10 +416,9 @@ class LogManager(logDirs: Array[File],
         CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath))
       }
     } catch {
-      case e: ExecutionException => {
+      case e: ExecutionException =>
         error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
         throw e.getCause
-      }
     } finally {
       threadPools.foreach(_.shutdown())
       // regardless of whether the close succeeded, we need to unlock the data directories
@@ -607,8 +606,9 @@ class LogManager(logDirs: Array[File],
     * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete"
and
     * add it in the queue for deletion.
     * @param topicPartition TopicPartition that needs to be deleted
+    * @return the removed log
     */
-  def asyncDelete(topicPartition: TopicPartition) = {
+  def asyncDelete(topicPartition: TopicPartition): Log = {
     val removedLog: Log = logCreationOrDeletionLock synchronized {
       logs.remove(topicPartition)
     }
@@ -627,11 +627,7 @@ class LogManager(logDirs: Array[File],
           checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
           removedLog.dir = renamedDir
           // change the file pointers for log and index file
-          for (logSegment <- removedLog.logSegments) {
-            logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
-            logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
-          }
-
+          removedLog.logSegments.foreach(_.updateDir(renamedDir))
           logsToBeDeleted.add(removedLog)
           removedLog.removeLogMetrics()
           info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath}
and is scheduled for deletion")
@@ -647,6 +643,7 @@ class LogManager(logDirs: Array[File],
     } else if (offlineLogDirs.nonEmpty) {
       throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because
it may be in one of the offline directories " + offlineLogDirs.mkString(","))
     }
+    removedLog
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3615ef73/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0449a4a..06c4e2d 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -380,6 +380,17 @@ class LogSegment(val log: FileRecords,
   }
 
   /**
+   * Update the directory reference for the log and indices in this segment. This would typically
be called after a
+   * directory is renamed.
+   */
+  def updateDir(dir: File): Unit = {
+    log.setFile(new File(dir, log.file.getName))
+    index.file = new File(dir, index.file.getName)
+    timeIndex.file = new File(dir, timeIndex.file.getName)
+    txnIndex.file = new File(dir, txnIndex.file.getName)
+  }
+
+  /**
    * Change the suffix for the index and log file for this log segment
    * IOException from this method should be handled by the caller
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3615ef73/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 61ac5ed..dd59e60 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -55,7 +55,7 @@ class LogManagerTest {
 
   @After
   def tearDown() {
-    if(logManager != null)
+    if (logManager != null)
       logManager.shutdown()
     Utils.delete(logDir)
     logManager.liveLogDirs.foreach(Utils.delete)
@@ -297,10 +297,8 @@ class LogManagerTest {
     logManager.checkpointLogRecoveryOffsets()
     val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
 
-    topicPartitions.zip(logs).foreach {
-      case(tp, log) => {
-        assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
-      }
+    topicPartitions.zip(logs).foreach { case (tp, log) =>
+      assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
     }
   }
 
@@ -310,4 +308,35 @@ class LogManagerTest {
       logDirs = logDirs,
       time = this.time)
   }
+
+  @Test
+  def testFileReferencesAfterAsyncDelete() {
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
+    val activeSegment = log.activeSegment
+    val logName = activeSegment.log.file.getName
+    val indexName = activeSegment.index.file.getName
+    val timeIndexName = activeSegment.timeIndex.file.getName
+    val txnIndexName = activeSegment.txnIndex.file.getName
+    val indexFilesOnDiskBeforeDelete = activeSegment.log.file.getParentFile.listFiles.filter(_.getName.endsWith("index"))
+
+    val removedLog = logManager.asyncDelete(new TopicPartition(name, 0))
+    val removedSegment = removedLog.activeSegment
+    val indexFilesAfterDelete = Seq(removedSegment.index.file, removedSegment.timeIndex.file,
+      removedSegment.txnIndex.file)
+
+    assertEquals(new File(removedLog.dir, logName), removedSegment.log.file)
+    assertEquals(new File(removedLog.dir, indexName), removedSegment.index.file)
+    assertEquals(new File(removedLog.dir, timeIndexName), removedSegment.timeIndex.file)
+    assertEquals(new File(removedLog.dir, txnIndexName), removedSegment.txnIndex.file)
+
+    // Try to detect the case where a new index type was added and we forgot to update the
pointer
+    // This will only catch cases where the index file is created eagerly instead of lazily
+    indexFilesOnDiskBeforeDelete.foreach { fileBeforeDelete =>
+      val fileInIndex = indexFilesAfterDelete.find(_.getName == fileBeforeDelete.getName)
+      assertEquals(s"Could not find index file ${fileBeforeDelete.getName} in indexFilesAfterDelete",
+        Some(fileBeforeDelete.getName), fileInIndex.map(_.getName))
+      assertNotEquals("File reference was not updated in index", fileBeforeDelete.getAbsolutePath,
+        fileInIndex.get.getAbsolutePath)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3615ef73/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d141a26..ab840ee 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1167,10 +1167,10 @@ object TestUtils extends Logging {
       "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is marked successful
in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
-      servers.forall(server => topicPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
+      servers.forall(server => topicPartitions.forall(tp => server.getLogManager.getLog(tp).isEmpty)))
     // ensure that topic is removed from all cleaner offsets
     TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall {
tp =>
-      val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
+      val checkpoints = server.getLogManager.liveLogDirs.map { logDir =>
         new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
       }
       checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))


Mime
View raw message