[ https://issues.apache.org/jira/browse/KAFKA-6680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405544#comment-16405544
]
ASF GitHub Bot commented on KAFKA-6680:
---------------------------------------
rajinisivaram closed pull request #4731: KAFKA-6680: Fix issues related to Dynamic Broker
configs
URL: https://github.com/apache/kafka/pull/4731
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af01bbb..92fd5d73136 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+ currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable
wi
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be
a subset of listeners '$newListeners'")
- if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
- throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}'
don't match")
+ if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+ throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map
'${newConfig.listenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
val prefix = listenerName.configPrefix
val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e2742dbc..bca98d2cee0 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
+import scala.collection.Set
class DynamicBrokerConfigTest {
@@ -248,4 +250,56 @@ class DynamicBrokerConfigTest {
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
}
+
+ @Test
+ def testDynamicListenerConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
+ val oldConfig = KafkaConfig.fromProps(props)
+ val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+ EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+ EasyMock.replay(kafkaServer)
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+ val newConfig = KafkaConfig(props)
+
+ val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+ dynamicListenerConfig.validateReconfiguration(newConfig)
+ }
+
+ @Test
+ def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+ val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+ EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new
java.util.Properties()).anyTimes()
+ EasyMock.replay(zkClient)
+
+ val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 9092))
+ val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+ dynamicBrokerConfig.initialize(zkClient)
+ dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+ val newprops = new Properties()
+ newprops.put(KafkaConfig.NumIoThreadsProp, "10")
+ newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
+ dynamicBrokerConfig.updateBrokerConfig(0, newprops)
+ }
}
+
+class TestDynamicThreadPool() extends BrokerReconfigurable {
+
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicThreadPool.ReconfigurableConfigs
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+ assertEquals(Defaults.NumIoThreads, oldConfig.numIoThreads)
+ assertEquals(Defaults.BackgroundThreads, oldConfig.backgroundThreads)
+
+ assertEquals(10, newConfig.numIoThreads)
+ assertEquals(100, newConfig.backgroundThreads)
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ assertEquals(10, newConfig.numIoThreads)
+ assertEquals(100, newConfig.backgroundThreads)
+ }
+}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
> Fix config initialization in DynamicBrokerConfig
> ------------------------------------------------
>
> Key: KAFKA-6680
> URL: https://issues.apache.org/jira/browse/KAFKA-6680
> Project: Kafka
> Issue Type: Bug
> Reporter: Manikumar
> Assignee: Manikumar
> Priority: Major
> Fix For: 1.2.0
>
>
> Below issues observed while testing dynamic config update feature
> 1. {{kafkaConfig}} doesn't get updated during {{initialize}} if there are no dynamic
configs defined in ZK.
> 2. update DynamicListenerConfig.validateReconfiguration() to check new Listeners must
be subset of listener map
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
|