kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6627: Prevent config default values overriding ones specified through --producer-property on command line. (#6084)
Date Fri, 11 Jan 2019 18:29:21 GMT
This is an automated email from the ASF dual-hosted git repository.

sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 694da1a  KAFKA-6627: Prevent config default values overriding ones specified through
--producer-property on command line. (#6084)
694da1a is described below

commit 694da1ac1e77c5b9aafe990def2238e521c0fa1f
Author: Kan Li <likan999@users.noreply.github.com>
AuthorDate: Fri Jan 11 10:29:10 2019 -0800

    KAFKA-6627: Prevent config default values overriding ones specified through --producer-property
on command line. (#6084)
    
    * KAFKA-6627: Prevent config default values overriding ones specified through --producer-property
on command line.
    
    In Console{Producer,Consumer}, extraProducerProps (options specified in
    --producer-property) is applied first, then overriden unconditionally,
    even if the value is not specified explicitly (and default value is
    used). This patch fixes it so that it doesn't override the existing
    value set by --producer-property if it is not explicitly specified.
    
    The contribution is my original work and I license the work to the
    project under the project's open source license.
    
    Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |   4 +-
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  41 +++---
 .../main/scala/kafka/utils/CommandLineUtils.scala  |  17 +++
 .../unit/kafka/utils/CommandLineUtilsTest.scala    | 147 +++++++++++++++++++++
 4 files changed, 187 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9a8c648..d246e7b 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -152,7 +152,8 @@ object ConsoleConsumer extends Logging {
     props ++= config.extraConsumerProps
     setAutoOffsetResetValue(config, props)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
-    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
+    CommandLineUtils.maybeMergeOptions(
+      props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
     props
   }
 
@@ -300,7 +301,6 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
-    val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter]
 
     if (keyDeserializer != null && !keyDeserializer.isEmpty) {
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index da7d120..829e271 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -91,20 +91,31 @@ object ConsoleProducer {
 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
-    props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
-    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
-    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
-    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.maxBlockMs.toString)
-    props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks)
-    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.requestTimeoutMs.toString)
-    props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
-    props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
-    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
-    props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
     props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
 
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.LINGER_MS_CONFIG, config.options, config.sendTimeoutOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.ACKS_CONFIG, config.options, config.requestRequiredAcksOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.options, config.requestTimeoutMsOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.RETRIES_CONFIG, config.options, config.messageSendMaxRetriesOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.options, config.retryBackoffMsOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.SEND_BUFFER_CONFIG, config.options, config.socketBufferSizeOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options, config.maxMemoryBytesOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.BATCH_SIZE_CONFIG, config.options, config.maxPartitionMemoryBytesOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.METADATA_MAX_AGE_CONFIG, config.options, config.metadataExpiryMsOpt)
+    CommandLineUtils.maybeMergeOptions(
+      props, ProducerConfig.MAX_BLOCK_MS_CONFIG, config.options, config.maxBlockMsOpt)
+
     props
   }
 
@@ -218,19 +229,9 @@ object ConsoleProducer {
                              else compressionCodecOptionValue
                            else NoCompressionCodec.name
     val batchSize = options.valueOf(batchSizeOpt)
-    val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
-    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
-    val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
-    val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
     val readerClass = options.valueOf(messageReaderOpt)
-    val socketBuffer = options.valueOf(socketBufferSizeOpt)
     val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
     val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
-    val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
-    val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
-    val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
-    val maxBlockMs = options.valueOf(maxBlockMsOpt)
   }
 
   class LineMessageReader extends MessageReader {
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 728f033..b3a4a8f 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -96,4 +96,21 @@ object CommandLineUtils extends Logging {
     }
     props
   }
+
+  /**
+    * Merge the options into {@code props} for key {@code key}, with the following precedence,
from high to low:
+    * 1) if {@code spec} is specified on {@code options} explicitly, use the value;
+    * 2) if {@code props} already has {@code key} set, keep it;
+    * 3) otherwise, use the default value of {@code spec}.
+    * A {@code null} value means to remove {@code key} from the {@code props}.
+    */
+  def maybeMergeOptions[V](props: Properties, key: String, options: OptionSet, spec: OptionSpec[V])
{
+    if (options.has(spec) || !props.containsKey(key)) {
+      val value = options.valueOf(spec)
+      if (value == null)
+        props.remove(key)
+      else
+        props.put(key, value.toString)
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
index 25c6729..f49b69b 100644
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
@@ -17,6 +17,9 @@
 
 package kafka.utils
 
+import java.util.Properties
+
+import joptsimple.{OptionParser, OptionSpec}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -73,4 +76,148 @@ class CommandLineUtilsTest {
     assertEquals("Value of second property should be 'thi=rd'", props.getProperty("third.property"),
"thi=rd")
   }
 
+  val props = new Properties()
+  val parser = new OptionParser(false)
+  var stringOpt : OptionSpec[String] = _
+  var intOpt : OptionSpec[java.lang.Integer] = _
+  var stringOptOptionalArg : OptionSpec[String] = _
+  var intOptOptionalArg : OptionSpec[java.lang.Integer] = _
+  var stringOptOptionalArgNoDefault : OptionSpec[String] = _
+  var intOptOptionalArgNoDefault : OptionSpec[java.lang.Integer] = _
+
+  def setUpOptions(): Unit = {
+    stringOpt = parser.accepts("str")
+      .withRequiredArg
+      .ofType(classOf[String])
+      .defaultsTo("default-string")
+    intOpt = parser.accepts("int")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
+    stringOptOptionalArg = parser.accepts("str-opt")
+      .withOptionalArg
+      .ofType(classOf[String])
+      .defaultsTo("default-string-2")
+    intOptOptionalArg = parser.accepts("int-opt")
+      .withOptionalArg
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
+      .withOptionalArg
+      .ofType(classOf[String])
+    intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
+      .withOptionalArg
+      .ofType(classOf[java.lang.Integer])
+  }
+
+  @Test
+  def testMaybeMergeOptionsOverwriteExisting(): Unit = {
+    setUpOptions()
+
+    props.put("skey", "existing-string")
+    props.put("ikey", "300")
+    props.put("sokey", "existing-string-2")
+    props.put("iokey", "400")
+    props.put("sondkey", "existing-string-3")
+    props.put("iondkey", "500")
+
+    val options = parser.parse(
+      "--str", "some-string",
+      "--int", "600",
+      "--str-opt", "some-string-2",
+      "--int-opt", "700",
+      "--str-opt-nodef", "some-string-3",
+      "--int-opt-nodef", "800",
+    )
+
+    CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
+    CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
+    CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
+    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
+
+    assertEquals("some-string", props.get("skey"))
+    assertEquals("600", props.get("ikey"))
+    assertEquals("some-string-2", props.get("sokey"))
+    assertEquals("700", props.get("iokey"))
+    assertEquals("some-string-3", props.get("sondkey"))
+    assertEquals("800", props.get("iondkey"))
+  }
+
+  @Test
+  def testMaybeMergeOptionsDefaultOverwriteExisting(): Unit = {
+    setUpOptions()
+
+    props.put("sokey", "existing-string")
+    props.put("iokey", "300")
+    props.put("sondkey", "existing-string-2")
+    props.put("iondkey", "400")
+
+    val options = parser.parse(
+      "--str-opt",
+      "--int-opt",
+      "--str-opt-nodef",
+      "--int-opt-nodef",
+    )
+
+    CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
+    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
+
+    assertEquals("default-string-2", props.get("sokey"))
+    assertEquals("200", props.get("iokey"))
+    assertNull(props.get("sondkey"))
+    assertNull(props.get("iondkey"))
+  }
+
+  @Test
+  def testMaybeMergeOptionsDefaultValueIfNotExist(): Unit = {
+    setUpOptions()
+
+    val options = parser.parse()
+
+    CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
+    CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
+    CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
+    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
+
+    assertEquals("default-string", props.get("skey"))
+    assertEquals("100", props.get("ikey"))
+    assertEquals("default-string-2", props.get("sokey"))
+    assertEquals("200", props.get("iokey"))
+    assertNull(props.get("sondkey"))
+    assertNull(props.get("iondkey"))
+  }
+
+  @Test
+  def testMaybeMergeOptionsNotOverwriteExisting(): Unit = {
+    setUpOptions()
+
+    props.put("skey", "existing-string")
+    props.put("ikey", "300")
+    props.put("sokey", "existing-string-2")
+    props.put("iokey", "400")
+    props.put("sondkey", "existing-string-3")
+    props.put("iondkey", "500")
+
+    val options = parser.parse()
+
+    CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
+    CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
+    CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
+    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
+    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
+
+    assertEquals("existing-string", props.get("skey"))
+    assertEquals("300", props.get("ikey"))
+    assertEquals("existing-string-2", props.get("sokey"))
+    assertEquals("400", props.get("iokey"))
+    assertEquals("existing-string-3", props.get("sondkey"))
+    assertEquals("500", props.get("iondkey"))
+  }
 }


Mime
View raw message