kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] 08/09: Changes requeted by reviewer
Date Fri, 02 Mar 2018 01:48:40 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cacd377933ae0e7da0ed08dd33ab69fabde073c5
Author: Sandor Murakozi <smurakozi@gmail.com>
AuthorDate: Wed Feb 28 20:57:57 2018 -0800

    Changes requeted by reviewer
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   3 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 103 +++++++++++----------
 2 files changed, 54 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 851c686..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,8 +89,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
     val response = retryRequestUntilConnected(setDataRequest)
-     if (response.resultCode != Code.OK)
-       throw KeeperException.create(response.resultCode)
+    response.maybeThrow()
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath,
brokerInfo.broker.endPoints))
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d6826a6..9590ecd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   var otherZkClient: KafkaZkClient = _
 
   @Before
-  override def setUp() {
+  override def setUp(): Unit = {
     super.setUp()
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled),
zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
 
   @After
-  override def tearDown() {
+  override def tearDown(): Unit = {
     if (otherZkClient != null)
       otherZkClient.close()
     super.tearDown()
@@ -131,9 +131,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
-    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      topicPartition -> Seq(0, 1),
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -311,8 +310,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testLogDirGetters(): Unit = {
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+    assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+      Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
 
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -495,7 +496,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeletePath() {
+  def testDeletePath(): Unit = {
     val path = "/a/b/c"
     zkClient.createRecursive(path)
     zkClient.deletePath(path)
@@ -503,7 +504,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicZNode(): Unit ={
+  def testDeleteTopicZNode(): Unit = {
     zkClient.deleteTopicZNode(topic1)
     zkClient.createRecursive(TopicZNode.path(topic1))
     zkClient.deleteTopicZNode(topic1)
@@ -525,20 +526,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
-  private def assertPathExistenceAndData(expectedPath: String, data: String){
+  private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
     assertTrue(zkClient.pathExists(expectedPath))
     assertEquals(Some(data), dataAsString(expectedPath))
    }
 
   @Test
-  def testCreateTokenChangeNotification() {
+  def testCreateTokenChangeNotification(): Unit = {
     intercept[NoNodeException] {
       zkClient.createTokenChangeNotification("delegationToken")
     }
     zkClient.createDelegationTokenPaths()
 
     zkClient.createTokenChangeNotification("delegationToken")
-    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000",
"delegationToken")
+    assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000",
"delegationToken")
   }
 
   @Test
@@ -560,7 +561,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateConfigChangeNotification() {
+  def testCreateConfigChangeNotification(): Unit = {
     intercept[NoNodeException] {
       zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
     }
@@ -569,11 +570,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
 
     assertPathExistenceAndData(
-      s"/config/changes/config_change_0000000000",
+      "/config/changes/config_change_0000000000",
       """{"version":2,"entity_path":"/config/topics/topic1"}""")
   }
 
-  private def createLogProps(bytesProp: Int) = {
+  private def createLogProps(bytesProp: Int): Properties = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
     logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -584,7 +585,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private val logProps = createLogProps(1024)
 
   @Test
-  def testGetLogConfigs() {
+  def testGetLogConfigs(): Unit = {
     val emptyConfig = LogConfig(Collections.emptyMap())
     assertEquals("Non existent config, no defaults",
       (Map(topic1 -> emptyConfig), Map.empty),
@@ -612,13 +613,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
         Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp
-> "128").asJava))
   }
 
-  private def createBrokerInfo(id: Int, host: String, port: Int,
-                               securityProtocol: SecurityProtocol, rack: Option[String] =
None) =
+  private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
     BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
     (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort
= port + 10)
 
   @Test
-  def testRegisterBrokerInfo() {
+  def testRegisterBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
     val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -640,7 +641,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetBrokerMethods() {
+  def testGetBrokerMethods(): Unit = {
     zkClient.createTopLevelPaths()
 
     assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -662,7 +663,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateBrokerInfo() {
+  def testUpdateBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
     // Updating info of a broker not existing in ZK fails
@@ -682,26 +683,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
   }
 
-  private def statWithVersion(version: Int) = {
+  private def statWithVersion(version: Int): Stat = {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     stat.setVersion(version)
     stat
   }
 
-  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
-    topicPartition10 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion
= zkVersion),
-      controllerEpoch = 4),
-    topicPartition11 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state),
zkVersion = zkVersion),
-      controllerEpoch = 4))
-
-  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state,
state - 1)
-
-  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
-  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
-  private def leaderIsrs(state: Int, zkVersion: Int) =
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition,
LeaderIsrAndControllerEpoch] =
+    Map(
+      topicPartition10 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion
= zkVersion),
+        controllerEpoch = 4),
+      topicPartition11 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state),
zkVersion = zkVersion),
+        controllerEpoch = 4))
+
+  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]
=
+    leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch]
=
+    leaderIsrAndControllerEpochs(state, state - 1)
+
+  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
     leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
 
   private def checkUpdateLeaderAndIsrResult(
@@ -720,7 +723,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateLeaderAndIsr() {
+  def testUpdateLeaderAndIsr(): Unit = {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     // Non-existing topicPartitions
@@ -738,14 +741,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       leaderIsrs(state = 1, zkVersion = 1),
       mutable.ArrayBuffer.empty,
       Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch =
4))
 
     // Try to update with wrong ZK version
     checkUpdateLeaderAndIsrResult(
       Map.empty,
       ArrayBuffer(topicPartition10, topicPartition11),
       Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch =
4))
 
     // Trigger successful, to be retried and failed partitions in same call
     val mixedState = Map(
@@ -764,7 +767,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private def checkGetDataResponse(
       leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
       topicPartition: TopicPartition,
-      response: GetDataResponse) = {
+      response: GetDataResponse): Unit = {
     val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
     assertEquals(Code.OK, response.resultCode)
     assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -774,10 +777,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
   }
 
-  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0,
0))
+  private def eraseMetadata(response: CreateResponse): CreateResponse =
+    response.copy(metadata = ResponseMetadata(0, 0))
 
   @Test
-  def testGetTopicsAndPartitions() {
+  def testGetTopicsAndPartitions(): Unit = {
     assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
     assertTrue(zkClient.getAllPartitions.isEmpty)
 
@@ -792,7 +796,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateAndGetTopicPartitionStatesRaw() {
+  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     assertEquals(
@@ -819,7 +823,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testSetTopicPartitionStatesRaw() {
+  def testSetTopicPartitionStatesRaw(): Unit = {
 
     def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat:
Stat) =
       topicPartitions.map { topicPartition =>
@@ -855,21 +859,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReassignPartitionsInProgress() {
+  def testReassignPartitionsInProgress(): Unit = {
     assertFalse(zkClient.reassignPartitionsInProgress)
     zkClient.createRecursive(ReassignPartitionsZNode.path)
     assertTrue(zkClient.reassignPartitionsInProgress)
   }
 
   @Test
-  def testGetTopicPartitionStates() {
+  def testGetTopicPartitionStates(): Unit = {
     assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
     assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
 
     zkClient.createRecursive(TopicZNode.path(topic1))
 
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
     assertEquals(
       initialLeaderIsrAndControllerEpochs,
       zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -894,13 +897,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   }
 
-  private def eraseMetadataAndStat(response: SetDataResponse) = {
+  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
     val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else
null
     response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
   }
 
   @Test
-  def testControllerEpochMethods() {
+  def testControllerEpochMethods(): Unit = {
     assertEquals(None, zkClient.getControllerEpoch)
 
     assertEquals("Setting non existing nodes should return NONODE results",
@@ -927,7 +930,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testControllerManagementMethods() {
+  def testControllerManagementMethods(): Unit = {
     // No controller
     assertEquals(None, zkClient.getControllerId)
     // Create controller

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

Mime
View raw message