kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3282; Change tools to use new consumer if zookeeper is not specified
Date Sun, 25 Sep 2016 08:14:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 36242b846 -> 1d055f755


KAFKA-3282; Change tools to use new consumer if zookeeper is not specified

Author: Arun Mahadevan <aiyer@hortonworks.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1376 from arunmahadevan/cons-consumer-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d055f75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d055f75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d055f75

Branch: refs/heads/trunk
Commit: 1d055f7551d138324d2540095a1cfc1c8f74d76f
Parents: 36242b8
Author: Arun Mahadevan <aiyer@hortonworks.com>
Authored: Sun Sep 25 08:44:56 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sun Sep 25 09:12:02 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 34 ++++++++--------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 41 ++++++++++++--------
 .../scala/kafka/tools/ConsumerPerformance.scala | 17 +++++---
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 32 +++++++++++----
 4 files changed, 76 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index f0c817f..1cc63b1 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -54,8 +54,11 @@ object ConsumerGroupCommand {
     opts.checkArgs()
 
     val consumerGroupService = {
-      if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts)
-      else new ZkConsumerGroupService(opts)
+      if (opts.useOldConsumer) {
+        new ZkConsumerGroupService(opts)
+      } else {
+        new KafkaConsumerGroupService(opts)
+      }
     }
 
     try {
@@ -376,9 +379,9 @@ object ConsumerGroupCommand {
   }
 
   class ConsumerGroupCommandOptions(args: Array[String]) {
-    val ZkConnectDoc = "REQUIRED (unless new-consumer is used): The connection string for
the zookeeper connection in the form host:port. " +
+    val ZkConnectDoc = "REQUIRED (only when using old consumer): The connection string for
the zookeeper connection in the form host:port. " +
       "Multiple URLS can be given to allow fail-over."
-    val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect
to."
+    val BootstrapServerDoc = "REQUIRED (unless old consumer is used): The server to connect
to."
     val GroupDoc = "The consumer group we wish to act on."
     val TopicDoc = "The topic whose consumer group information should be deleted."
     val ListDoc = "List all consumer groups."
@@ -391,7 +394,7 @@ object ConsumerGroupCommand {
       "Pass in just a topic to delete the given topic's partition offsets and ownership information
" +
       "for every consumer group. For instance --topic t1" + nl +
       "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to
use it carefully to only delete groups that are not active."
-    val NewConsumerDoc = "Use new consumer."
+    val NewConsumerDoc = "Use new consumer. This is the default."
     val CommandConfigDoc = "Property file containing configs to be passed to Admin Client
and Consumer."
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
@@ -420,27 +423,24 @@ object ConsumerGroupCommand {
                                   .ofType(classOf[String])
     val options = parser.parse(args : _*)
 
+    val useOldConsumer = options.has(zkConnectOpt)
+
     val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
 
     def checkArgs() {
       // check required args
-      if (options.has(newConsumerOpt)) {
+      if (useOldConsumer) {
+        if (options.has(bootstrapServerOpt))
+          CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid
with $zkConnectOpt.")
+        else if (options.has(newConsumerOpt))
+          CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid
with $zkConnectOpt.")
+      } else {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
-        if (options.has(zkConnectOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with
$newConsumerOpt")
-
         if (options.has(deleteOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with
$newConsumerOpt. Note that " +
+          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with
$zkConnectOpt. Note that " +
             "there's no need to delete group metadata for the new consumer as it is automatically
deleted when the last " +
             "member leaves")
-
-      } else {
-        CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
-
-        if (options.has(bootstrapServerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only
valid with $newConsumerOpt")
-
       }
 
       if (options.has(describeOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 17cf5bd..361bef2 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -58,15 +58,15 @@ object ConsoleConsumer extends Logging {
   def run(conf: ConsumerConfig) {
 
     val consumer =
-      if (conf.useNewConsumer) {
+      if (conf.useOldConsumer) {
+        checkZk(conf)
+        new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
+      } else {
         val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
         if (conf.partitionArg.isDefined)
           new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg),
None, getNewConsumerProps(conf), timeoutMs)
         else
           new NewShinyConsumer(Option(conf.topicArg), None, None, Option(conf.whitelistArg),
getNewConsumerProps(conf), timeoutMs)
-      } else {
-        checkZk(conf)
-        new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
       }
 
     addShutdownHook(consumer, conf)
@@ -224,7 +224,7 @@ object ConsoleConsumer extends Logging {
       .describedAs("consume offset")
       .ofType(classOf[String])
       .defaultsTo("latest")
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED (only when using old consumer):
The connection string for the zookeeper connection in the form host:port. " +
       "Multiple URLS can be given to allow fail-over.")
       .withRequiredArg
       .describedAs("urls")
@@ -265,8 +265,8 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("metrics directory")
       .ofType(classOf[java.lang.String])
-    val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
-    val bootstrapServerOpt = parser.accepts("bootstrap-server")
+    val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.
This is the default.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED (unless old consumer
is used): The server to connect to.")
       .withRequiredArg
       .describedAs("server to connect to")
       .ofType(classOf[String])
@@ -287,7 +287,7 @@ object ConsoleConsumer extends Logging {
 
     var groupIdPassed = true
     val options: OptionSet = tryParse(parser, args)
-    val useNewConsumer = options.has(useNewConsumerOpt)
+    val useOldConsumer = options.has(zkConnectOpt)
     val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
 
     // If using old consumer, exactly one of whitelist/blacklist/topic is required.
@@ -314,21 +314,27 @@ object ConsoleConsumer extends Logging {
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
-    if (useNewConsumer) {
-      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
-      if (topicOrFilterOpt.size != 1)
-        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
-      topicArg = options.valueOf(topicIdOpt)
-      whitelistArg = options.valueOf(whitelistOpt)
-    } else {
+    if (useOldConsumer) {
+      if (options.has(bootstrapServerOpt))
+        CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid
with $zkConnectOpt.")
+      else if (options.has(newConsumerOpt))
+        CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with
$zkConnectOpt.")
       val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
       if (topicOrFilterOpt.size != 1)
         CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
       topicArg = options.valueOf(topicOrFilterOpt.head)
       filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+      Console.err.println("Using the ConsoleConsumer with old consumer is deprecated and
will be removed " +
+        s"in a future major release. Consider using the new consumer by passing $bootstrapServerOpt
instead of ${zkConnectOpt}.")
+    } else {
+      val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
+      if (topicOrFilterOpt.size != 1)
+        CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
+      topicArg = options.valueOf(topicIdOpt)
+      whitelistArg = options.valueOf(whitelistOpt)
     }
 
-    if (!useNewConsumer && (partitionArg.isDefined || options.has(offsetOpt)))
+    if (useOldConsumer && (partitionArg.isDefined || options.has(offsetOpt)))
       CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported
in the new consumer only.")
 
     if (partitionArg.isDefined) {
@@ -361,7 +367,8 @@ object ConsoleConsumer extends Logging {
       else if (fromBeginning) OffsetRequest.EarliestTime
       else OffsetRequest.LatestTime
 
-    CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt
else zkConnectOpt)
+    if (!useOldConsumer)
+      CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
     if (options.has(csvMetricsReporterEnabledOpt)) {
       val csvReporterProps = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 36376bf..63a04c9 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -58,7 +58,7 @@ object ConsumerPerformance {
     }
 
     var startMs, endMs = 0L
-    if (config.useNewConsumer) {
+    if (!config.useOldConsumer) {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
       consumer.subscribe(List(config.topic))
       startMs = System.currentTimeMillis
@@ -163,12 +163,12 @@ object ConsumerPerformance {
   }
 
   class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper
connection in the form host:port. " +
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED (only when using old consumer):
The connection string for the zookeeper connection in the form host:port. " +
       "Multiple URLS can be given to allow fail-over. This option is only used with the old
consumer.")
       .withRequiredArg
       .describedAs("urls")
       .ofType(classOf[String])
-    val bootstrapServersOpt = parser.accepts("broker-list", "A broker list to use for connecting
if using the new consumer.")
+    val bootstrapServersOpt = parser.accepts("broker-list", "REQUIRED (unless old consumer
is used): A broker list to use for connecting if using the new consumer.")
       .withRequiredArg()
       .describedAs("host")
       .ofType(classOf[String])
@@ -203,7 +203,7 @@ object ConsumerPerformance {
       .describedAs("count")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
-    val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
+    val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.
This is the default.")
     val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties
file.")
       .withRequiredArg
       .describedAs("config file")
@@ -213,13 +213,14 @@ object ConsumerPerformance {
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)
 
-    val useNewConsumer = options.has(useNewConsumerOpt)
+    val useOldConsumer = options.has(zkConnectOpt)
 
     val props = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
     else
       new Properties
-    if (useNewConsumer) {
+    if (!useOldConsumer) {
+      CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServersOpt)
       import org.apache.kafka.clients.consumer.ConsumerConfig
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
       props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
@@ -230,6 +231,10 @@ object ConsumerPerformance {
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
       props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
     } else {
+      if (options.has(bootstrapServersOpt))
+        CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServersOpt is not valid
with $zkConnectOpt.")
+      else if (options.has(newConsumerOpt))
+        CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with
$zkConnectOpt.")
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt)
       props.put("group.id", options.valueOf(groupIdOpt))
       props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 63be9c4..013ed3e 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -89,7 +89,7 @@ class ConsoleConsumerTest extends JUnitSuite {
     val config = new ConsoleConsumer.ConsumerConfig(args)
 
     //Then
-    assertFalse(config.useNewConsumer)
+    assertTrue(config.useOldConsumer)
     assertEquals("localhost:2181", config.zkConnectionStr)
     assertEquals("test", config.topicArg)
     assertEquals(true, config.fromBeginning)
@@ -108,14 +108,14 @@ class ConsoleConsumerTest extends JUnitSuite {
     val config = new ConsoleConsumer.ConsumerConfig(args)
 
     //Then
-    assertTrue(config.useNewConsumer)
+    assertFalse(config.useOldConsumer)
     assertEquals("localhost:9092", config.bootstrapServer)
     assertEquals("test", config.topicArg)
     assertEquals(true, config.fromBeginning)
   }
 
   @Test
-  def shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset() {
+  def shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset(): Unit = {
     //Given
     val args: Array[String] = Array(
       "--bootstrap-server", "localhost:9092",
@@ -128,12 +128,28 @@ class ConsoleConsumerTest extends JUnitSuite {
     val config = new ConsoleConsumer.ConsumerConfig(args)
 
     //Then
-    assertTrue(config.useNewConsumer)
+    assertFalse(config.useOldConsumer)
     assertEquals("localhost:9092", config.bootstrapServer)
     assertEquals("test", config.topicArg)
     assertEquals(0, config.partitionArg.get)
     assertEquals(3, config.offsetArg)
     assertEquals(false, config.fromBeginning)
+
+  }
+
+  @Test
+  def testDefaultConsumer() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertFalse(config.useOldConsumer)
   }
 
   @Test
@@ -150,7 +166,7 @@ class ConsoleConsumerTest extends JUnitSuite {
     val config = new ConsoleConsumer.ConsumerConfig(args)
 
     //Then
-    assertTrue(config.useNewConsumer)
+    assertFalse(config.useOldConsumer)
     assertEquals("localhost:9092", config.bootstrapServer)
     assertEquals("test", config.topicArg)
     assertEquals(0, config.partitionArg.get)
@@ -162,16 +178,16 @@ class ConsoleConsumerTest extends JUnitSuite {
   def shouldParseConfigsFromFile() {
     val propsFile = TestUtils.tempFile()
     val propsStream = new FileOutputStream(propsFile)
-    propsStream.write("consumer.timeout.ms=1000".getBytes())
+    propsStream.write("request.timeout.ms=1000".getBytes())
     propsStream.close()
     val args: Array[String] = Array(
-      "--zookeeper", "localhost:2181",
+      "--bootstrap-server", "localhost:9092",
       "--topic", "test",
       "--consumer.config", propsFile.getAbsolutePath
     )
 
     val config = new ConsoleConsumer.ConsumerConfig(args)
 
-    assertEquals("1000", config.consumerProps.getProperty("consumer.timeout.ms"))
+    assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms"))
   }
 }


Mime
View raw message