kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add registerController method to KafkaZkClient (#4598)
Date Sat, 21 Jul 2018 18:40:06 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 591954e  MINOR: Add registerController method to KafkaZkClient (#4598)
591954e is described below

commit 591954e2e55e0c262bf029f4b8f17566dcae6818
Author: Sandor Murakozi <smurakozi@gmail.com>
AuthorDate: Sat Jul 21 20:40:00 2018 +0200

    MINOR: Add registerController method to KafkaZkClient (#4598)
    
    And change KafkaController to use the newly introduced method.
    Also remove redundant `InZk` postfixes from `registerBrokerInZk` and
    `updateBrokerInfoInZk`.
    
    As `checkedEphemeralCreate` is not used outside of `KafkaZkClient`
    any longer, reduce its visibility.
    
    ControllerIntegrationTest already covers this functionality well, it validates the
    refactor.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../scala/kafka/controller/KafkaController.scala     |  6 +++---
 core/src/main/scala/kafka/server/KafkaServer.scala   |  2 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala     | 17 ++++++++++++++---
 .../scala/unit/kafka/admin/ConfigCommandTest.scala   |  2 +-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 .../test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 20 ++++++++++----------
 6 files changed, 30 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 11d22fd..645080f 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -195,7 +195,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
   private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
     this.brokerInfo = newBrokerInfo
-    zkClient.updateBrokerInfoInZk(newBrokerInfo)
+    zkClient.updateBrokerInfo(newBrokerInfo)
   }
 
   private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
@@ -1208,7 +1208,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     }
 
     try {
-      zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId,
timestamp))
+      zkClient.registerController(config.brokerId, timestamp)
       info(s"${config.brokerId} successfully elected as the controller")
       activeControllerId = config.brokerId
       onControllerFailover()
@@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     override def state: ControllerState = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      zkClient.registerBrokerInZk(brokerInfo)
+      zkClient.registerBroker(brokerInfo)
       Reelect.process()
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f73ede6..6c1bb83 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -254,7 +254,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         replicaManager.startup()
 
         val brokerInfo = createBrokerInfo
-        zkClient.registerBrokerInZk(brokerInfo)
+        zkClient.registerBroker(brokerInfo)
 
         // Now that the broker id is successfully registered, checkpoint it
         checkpointBrokerId(config.brokerId)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index ec4932a..c45a90f 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -79,13 +79,24 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
     createResponse.name
   }
 
-  def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = {
+  def registerBroker(brokerInfo: BrokerInfo): Unit = {
     val path = brokerInfo.path
     checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
     info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
   }
 
-  def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
+  /**
+   * Registers a given broker in zookeeper as the controller.
+   * @param controllerId the id of the broker that is to be registered as the controller.
+   * @param timestamp the timestamp of the controller election.
+   * @throws KeeperException if an error is returned by ZooKeeper.
+   */
+  def registerController(controllerId: Int, timestamp: Long): Unit = {
+    val path = ControllerZNode.path
+    checkedEphemeralCreate(path, ControllerZNode.encode(controllerId, timestamp))
+  }
+
+  def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
     val response = retryRequestUntilConnected(setDataRequest)
@@ -1509,7 +1520,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
     responses
   }
 
-  def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
+  private def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
     val checkedEphemeral = new CheckedEphemeral(path, data)
     info(s"Creating $path (is it secure? $isSecure)")
     val code = checkedEphemeral.create()
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 2e8179c..cb261f6 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -687,7 +687,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol)
     val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), ApiVersion.latestVersion,
jmxPort = 9192)
-    zkClient.registerBrokerInZk(brokerInfo)
+    zkClient.registerBroker(brokerInfo)
   }
 
   class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f89abb9..cf60c78 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -674,7 +674,7 @@ object TestUtils extends Logging {
       val listenerName = ListenerName.forSecurityProtocol(protocol)
       Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), b.rack)
     }
-    brokers.foreach(b => zkClient.registerBrokerInZk(BrokerInfo(Broker(b.id, b.endPoints,
rack = b.rack),
+    brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id, b.endPoints,
rack = b.rack),
       ApiVersion.latestVersion, jmxPort = -1)))
     brokers
   }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index cc67a01..df009e8 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -630,17 +630,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
     val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
 
-    zkClient.registerBrokerInZk(brokerInfo)
+    zkClient.registerBroker(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
     assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
 
     // Node exists, owned by current session - no error, no update
-    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    zkClient.registerBroker(differentBrokerInfoWithSameId)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
 
     // Other client tries to register broker with same id causes failure, info is not changed
in ZK
     intercept[NodeExistsException] {
-      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+      otherZkClient.registerBroker(differentBrokerInfoWithSameId)
     }
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
   }
@@ -656,8 +656,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
     val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
 
-    zkClient.registerBrokerInZk(brokerInfo1)
-    otherZkClient.registerBrokerInZk(brokerInfo0)
+    zkClient.registerBroker(brokerInfo1)
+    otherZkClient.registerBroker(brokerInfo0)
 
     assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
     assertEquals(
@@ -674,17 +674,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // Updating info of a broker not existing in ZK fails
     val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
     intercept[NoNodeException]{
-      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+      zkClient.updateBrokerInfo(originalBrokerInfo)
     }
 
-    zkClient.registerBrokerInZk(originalBrokerInfo)
+    zkClient.registerBroker(originalBrokerInfo)
 
     val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
-    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+    zkClient.updateBrokerInfo(updatedBrokerInfo)
     assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
 
     // Other ZK clients can update info
-    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    otherZkClient.updateBrokerInfo(originalBrokerInfo)
     assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
   }
 
@@ -937,7 +937,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // No controller
     assertEquals(None, zkClient.getControllerId)
     // Create controller
-    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId
= 1, timestamp = 123456))
+    zkClient.registerController(controllerId = 1, timestamp = 123456)
     assertEquals(Some(1), zkClient.getControllerId)
     zkClient.deleteController()
     assertEquals(None, zkClient.getControllerId)


Mime
View raw message