kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637)
Date Tue, 16 Oct 2018 03:14:27 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5681309  KAFKA-6764: Improve the whitelist command-line option for console-consumer
(#5637)
5681309 is described below

commit 5681309094b114c65018c6951f23eca88c329e03
Author: Suman <sumannewton@gmail.com>
AuthorDate: Tue Oct 16 08:44:17 2018 +0530

    KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637)
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  8 ++-
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 63 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 365652a..06705d5 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -65,6 +65,7 @@ object ConsoleConsumer extends Logging {
   def run(conf: ConsumerConfig) {
     val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
     val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new
ByteArrayDeserializer)
+
     val consumerWrapper =
       if (conf.partitionArg.isDefined)
         new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg),
None, consumer, timeoutMs)
@@ -194,7 +195,7 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("topic")
       .ofType(classOf[String])
-    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+    val whitelistOpt = parser.accepts("whitelist", "Regular expression specifying whitelist
of topics to include for consumption.")
       .withRequiredArg
       .describedAs("whitelist")
       .ofType(classOf[String])
@@ -355,7 +356,7 @@ object ConsoleConsumer extends Logging {
     val groupIdsProvided = Set(
       Option(options.valueOf(groupIdOpt)), // via --group
       Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property
-      Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --cosumer.config
+      Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config
     ).flatten
 
     if (groupIdsProvided.size > 1) {
@@ -376,6 +377,9 @@ object ConsoleConsumer extends Logging {
         groupIdPassed = false
     }
 
+    if (groupIdPassed && partitionArg.isDefined)
+      CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified
together.")
+
     def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
       try
         parser.parse(args: _*)
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 47b7fae..cdc146f 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -432,4 +432,67 @@ class ConsoleConsumerTest {
     assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
 
+  @Test
+  def shouldParseGroupIdFromBeginningGivenTogether() {
+    // Start from earliest
+    var args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group",
+      "--from-beginning")
+
+    var config = new ConsoleConsumer.ConsumerConfig(args)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(-2, config.offsetArg)
+    assertEquals(true, config.fromBeginning)
+
+    // Start from latest
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group"
+    )
+
+    config = new ConsoleConsumer.ConsumerConfig(args)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(-1, config.offsetArg)
+    assertEquals(false, config.fromBeginning)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnGroupIdAndPartitionGivenTogether() {
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group",
+      "--partition", "0")
+
+    //When
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnOffsetWithoutPartition() {
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--offset", "10")
+
+    //When
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
 }


Mime
View raw message