kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6680) Fix config initialization in DynamicBrokerConfig
Date Mon, 19 Mar 2018 22:23:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405544#comment-16405544
] 

ASF GitHub Bot commented on KAFKA-6680:
---------------------------------------

rajinisivaram closed pull request #4731:  KAFKA-6680: Fix issues related to Dynamic Broker
configs
URL: https://github.com/apache/kafka/pull/4731
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af01bbb..92fd5d73136 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
   private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+    currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
     val adminZkClient = new AdminZkClient(zkClient)
     updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable
wi
     val oldListeners = listenersToMap(oldConfig.listeners)
     if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
       throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be
a subset of listeners '$newListeners'")
-    if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
-      throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}'
don't match")
+    if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+      throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map
'${newConfig.listenerSecurityProtocolMap}'")
     newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
       val prefix = listenerName.configPrefix
       val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e2742dbc..bca98d2cee0 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.Set
 
 class DynamicBrokerConfigTest {
 
@@ -248,4 +250,56 @@ class DynamicBrokerConfigTest {
     newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
     assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
   }
+
+  @Test
+  def testDynamicListenerConfig(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
+    val oldConfig =  KafkaConfig.fromProps(props)
+    val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+    EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+    EasyMock.replay(kafkaServer)
+
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+    val newConfig = KafkaConfig(props)
+
+    val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+    dynamicListenerConfig.validateReconfiguration(newConfig)
+  }
+
+  @Test
+  def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+    val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+    EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new
java.util.Properties()).anyTimes()
+    EasyMock.replay(zkClient)
+
+    val oldConfig =  KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 9092))
+    val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+    dynamicBrokerConfig.initialize(zkClient)
+    dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+    val newprops = new Properties()
+    newprops.put(KafkaConfig.NumIoThreadsProp, "10")
+    newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
+    dynamicBrokerConfig.updateBrokerConfig(0, newprops)
+  }
 }
+
+class TestDynamicThreadPool() extends BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicThreadPool.ReconfigurableConfigs
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    assertEquals(Defaults.NumIoThreads, oldConfig.numIoThreads)
+    assertEquals(Defaults.BackgroundThreads, oldConfig.backgroundThreads)
+
+    assertEquals(10, newConfig.numIoThreads)
+    assertEquals(100, newConfig.backgroundThreads)
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    assertEquals(10, newConfig.numIoThreads)
+    assertEquals(100, newConfig.backgroundThreads)
+  }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fix config initialization in DynamicBrokerConfig
> ------------------------------------------------
>
>                 Key: KAFKA-6680
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6680
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Manikumar
>            Assignee: Manikumar
>            Priority: Major
>             Fix For: 1.2.0
>
>
> Below issues observed while testing dynamic config update feature
> 1. {{kafkaConfig}} doesn't get updated during {{initialize}} if there are no dynamic
configs defined in ZK.
> 2.  update DynamicListenerConfig.validateReconfiguration() to check new Listeners must
be subset of listener map



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message