kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-6697: Broker should not die if getCanonicalPath fails (#4752)
Date Wed, 20 Jun 2018 08:25:13 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b5d6c55  KAFKA-6697: Broker should not die if getCanonicalPath fails (#4752)
b5d6c55 is described below

commit b5d6c55a2fc3b90747f9393bdfbdb95b8012cd10
Author: Dong Lin <lindong28@gmail.com>
AuthorDate: Wed Jun 20 01:24:26 2018 -0700

    KAFKA-6697: Broker should not die if getCanonicalPath fails (#4752)
    
    A broker with multiple log dirs will die on startup if dir.getCanonicalPath() throws
    IOException for one of the log dirs. We should mark such log directory as offline
    instead and the broker should start if there is a healthy log dir.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 20 ++++++++++++-------
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 23 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c0ac3b8..3bb5ee6 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -144,10 +144,8 @@ class LogManager(logDirs: Seq[File],
    * </ol>
    */
   private def createAndValidateLogDirs(dirs: Seq[File], initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File]
= {
-    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
-      throw new KafkaException("Duplicate log directory found: " + dirs.mkString(", "))
-
     val liveLogDirs = new ConcurrentLinkedQueue[File]()
+    val canonicalPaths = mutable.HashSet.empty[String]
 
     for (dir <- dirs) {
       try {
@@ -155,13 +153,21 @@ class LogManager(logDirs: Seq[File],
           throw new IOException(s"Failed to load ${dir.getAbsolutePath} during broker startup")
 
         if (!dir.exists) {
-          info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
+          info(s"Log directory ${dir.getAbsolutePath} not found, creating it.")
           val created = dir.mkdirs()
           if (!created)
-            throw new IOException("Failed to create data directory " + dir.getAbsolutePath)
+            throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
         }
         if (!dir.isDirectory || !dir.canRead)
-          throw new IOException(dir.getAbsolutePath + " is not a readable log directory.")
+          throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
+
+        // getCanonicalPath() throws IOException if a file system query fails or if the path
is invalid (e.g. contains
+        // the Nul character). Since there's no easy way to distinguish between the two cases,
we treat them the same
+        // and mark the log directory as offline.
+        if (!canonicalPaths.add(dir.getCanonicalPath))
+          throw new KafkaException(s"Duplicate log directory found: ${dirs.mkString(", ")}")
+
+
         liveLogDirs.add(dir)
       } catch {
         case e: IOException =>
@@ -169,7 +175,7 @@ class LogManager(logDirs: Seq[File],
       }
     }
     if (liveLogDirs.isEmpty) {
-      fatal(s"Shutdown broker because none of the specified log dirs from " + dirs.mkString(",
") + " can be created or validated")
+      fatal(s"Shutdown broker because none of the specified log dirs from ${dirs.mkString(",
")} can be created or validated")
       Exit.halt(1)
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index d9efc23..3fc6c1c 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -67,6 +67,29 @@ class LogManagerTest {
   @Test
   def testCreateLog() {
     val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
+    assertEquals(1, logManager.liveLogDirs.size)
+
+    val logFile = new File(logDir, name + "-0")
+    assertTrue(logFile.exists)
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
+  }
+
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log and that we can append
to the new log.
+   * The LogManager is configured with one invalid log directory which should be marked as
offline.
+   */
+  @Test
+  def testCreateLogWithInvalidLogDir() {
+    // Configure the log dir with the Nul character as the path, which causes dir.getCanonicalPath()
to throw an
+    // IOException. This simulates the scenario where the disk is not properly mounted (which
is hard to achieve in
+    // a unit test)
+    val dirs = Seq(logDir, new File("\u0000"))
+
+    logManager.shutdown()
+    logManager = createLogManager(dirs)
+    logManager.startup()
+
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig, isNew = true)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)


Mime
View raw message