kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3091: Broker persists generated ID even when the ID can't be used due to duplicates
Date Tue, 19 Jan 2016 02:55:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5c337d759 -> b1d325b3c


KAFKA-3091: Broker persists generated ID even when the ID can't be used due to duplicates

…updated to a new valid one

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Gwen Shapira

Closes #763 from granthenke/id-start-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1d325b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1d325b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1d325b3

Branch: refs/heads/trunk
Commit: b1d325b3c09cd95d69a66fac4a3760f57d3062c9
Parents: 5c337d7
Author: Grant Henke <granthenke@gmail.com>
Authored: Mon Jan 18 18:55:50 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Jan 18 18:55:50 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaServer.scala   | 25 ++++++++-----
 .../server/ServerGenerateBrokerIdTest.scala     | 38 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d325b3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 454633e..901ba2e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -239,6 +239,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
         kafkaHealthcheck.startup()
 
+        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint
it
+        checkpointBrokerId(config.brokerId)
+
         /* register broker metrics */
         registerStats()
 
@@ -620,16 +623,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime,
threadNamePr
     */
   private def getBrokerId: Int =  {
     var brokerId = config.brokerId
-    var logDirsWithoutMetaProps: List[String] = List()
     val brokerIdSet = mutable.HashSet[Int]()
 
     for (logDir <- config.logDirs) {
       val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
-      brokerMetadataOpt match {
-        case Some(brokerMetadata: BrokerMetadata) =>
-          brokerIdSet.add(brokerMetadata.brokerId)
-        case None =>
-          logDirsWithoutMetaProps ++= List(logDir)
+      brokerMetadataOpt.foreach { brokerMetadata =>
+        brokerIdSet.add(brokerMetadata.brokerId)
       }
     }
 
@@ -642,12 +641,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime,
threadNamePr
     else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last
 
+    brokerId
+  }
+
+  private def checkpointBrokerId(brokerId: Int) {
+    var logDirsWithoutMetaProps: List[String] = List()
+
+    for (logDir <- config.logDirs) {
+      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
+      if(brokerMetadataOpt.isEmpty)
+          logDirsWithoutMetaProps ++= List(logDir)
+    }
+
     for(logDir <- logDirsWithoutMetaProps) {
       val checkpoint = brokerMetadataCheckpoints(logDir)
       checkpoint.write(new BrokerMetadata(brokerId))
     }
-
-    brokerId
   }
 
   private def generateBrokerId: Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d325b3/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 60ec561..c26ff13 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -137,6 +137,44 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
+  @Test
+  def testBrokerMetadataOnIdCollision() {
+    // Start a good server
+    val propsA = TestUtils.createBrokerConfig(1, zkConnect)
+    val configA = KafkaConfig.fromProps(propsA)
+    val serverA = new KafkaServer(configA)
+    serverA.startup()
+
+    // Start a server that collides on the broker id
+    val propsB = TestUtils.createBrokerConfig(1, zkConnect)
+    val configB = KafkaConfig.fromProps(propsB)
+    val serverB = new KafkaServer(configB)
+    intercept[RuntimeException] {
+      serverB.startup()
+    }
+
+    // verify no broker metadata was written
+    serverB.config.logDirs.foreach { logDir =>
+      val brokerMetaFile = new File(logDir + File.separator + brokerMetaPropsFile)
+      assertFalse(brokerMetaFile.exists())
+    }
+
+    // adjust the broker config and start again
+    propsB.setProperty(KafkaConfig.BrokerIdProp, "2")
+    val newConfigB = KafkaConfig.fromProps(propsB)
+    val newServerB = new KafkaServer(newConfigB)
+    newServerB.startup()
+
+    serverA.shutdown()
+    newServerB.shutdown()
+    // verify correct broker metadata was written
+    assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1))
+    assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2))
+    CoreUtils.rm(serverA.config.logDirs)
+    CoreUtils.rm(newServerB.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
   def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
     for(logDir <- logDirs) {
       val brokerMetadataOpt = (new BrokerMetadataCheckpoint(


Mime
View raw message