kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Make new consumer default for Mirror Maker
Date Tue, 27 Sep 2016 21:08:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b8ed4a511 -> 3db752a56


MINOR: Make new consumer default for Mirror Maker

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1914 from hachikuji/mm-default-new-consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3db752a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3db752a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3db752a5

Branch: refs/heads/trunk
Commit: 3db752a565071c78e4b11eaafa739844fa785b04
Parents: b8ed4a5
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Sep 27 21:35:29 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Sep 27 21:35:29 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 49 +++++++++++++-------
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 16 +++++--
 tests/kafkatest/services/console_consumer.py    |  2 +-
 tests/kafkatest/services/mirror_maker.py        |  2 -
 .../performance/consumer_performance.py         |  1 -
 5 files changed, 44 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3db752a5/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 1373f51..979203c 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -20,18 +20,18 @@ package kafka.tools
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.regex.{PatternSyntaxException, Pattern}
+import java.util.regex.{Pattern, PatternSyntaxException}
 import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig
=> OldConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist,
ZookeeperConsumerConnector}
+import kafka.consumer.{BaseConsumer, BaseConsumerRecord, Blacklist, ConsumerIterator, ConsumerThreadId,
ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector, ConsumerConfig
=> OldConsumerConfig}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
-import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
+import kafka.utils.{CommandLineUtils, CoreUtils, Logging, ZKConfig}
 import org.apache.kafka.clients.consumer
-import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer,
CommitFailedException}
+import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerRecord,
KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
 import org.apache.kafka.common.TopicPartition
@@ -95,7 +95,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         .ofType(classOf[String])
 
       val useNewConsumerOpt = parser.accepts("new.consumer",
-        "Use new consumer in mirror maker.")
+        "Use new consumer in mirror maker (this is the default).")
 
       val producerConfigOpt = parser.accepts("producer.config",
         "Embedded producer config.")
@@ -170,34 +170,47 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
       if (options.has(helpOpt)) {
         parser.printHelpOn(System.out)
-        System.exit(0)
+        sys.exit(0)
       }
 
       CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
 
       val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
+      val useOldConsumer = consumerProps.containsKey(ZKConfig.ZkConnectProp)
 
-      val useNewConsumer = options.has(useNewConsumerOpt)
-      if (useNewConsumer) {
+      if (useOldConsumer) {
+        if (options.has(useNewConsumerOpt)) {
+          error(s"The consumer configuration parameter `${ZKConfig.ZkConnectProp}` is not
valid when using --new.consumer")
+          sys.exit(1)
+        }
+
+        if (consumerProps.containsKey(NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+          error(s"The configuration parameters `${ZKConfig.ZkConnectProp}` (old consumer)
and " +
+            s"`${NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}` (new consumer) cannot be used
together.")
+          sys.exit(1)
+        }
+
+        if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
+          error("Exactly one of whitelist or blacklist is required.")
+          sys.exit(1)
+        }
+      } else {
         if (options.has(blacklistOpt)) {
           error("blacklist can not be used when using new consumer in mirror maker. Use whitelist
instead.")
-          System.exit(1)
+          sys.exit(1)
         }
+
         if (!options.has(whitelistOpt)) {
           error("whitelist must be specified when using new consumer in mirror maker.")
-          System.exit(1)
+          sys.exit(1)
         }
 
-        if (!consumerProps.keySet().contains(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
+        if (!consumerProps.containsKey(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
           System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based
mirror maker will " +
             "change from 'range' to 'roundrobin' in an upcoming release (so that better load
balancing can be achieved). If " +
             "you prefer to make this switch in advance of that release add the following
to the corresponding new-consumer " +
             "config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
-      } else {
-        if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
-          error("Exactly one of whitelist or blacklist is required.")
-          System.exit(1)
-        }
+
       }
 
       abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
@@ -223,7 +236,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       producer = new MirrorMakerProducer(producerProps)
 
       // Create consumers
-      val mirrorMakerConsumers = if (!useNewConsumer) {
+      val mirrorMakerConsumers = if (useOldConsumer) {
         val customRebalanceListener = {
           val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
           if (customRebalanceListenerClass != null) {
@@ -450,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // if it exits accidentally, stop the entire mirror maker
         if (!isShuttingdown.get()) {
           fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
-          System.exit(-1)
+          sys.exit(-1)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3db752a5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 96779ff..e5b1b6a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -915,19 +915,27 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group)
   def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
 }
 
+object ZKConfig {
+  val ZkConnectProp = "zookeeper.connect"
+  val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
+  val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
+  val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
+}
 
 class ZKConfig(props: VerifiableProperties) {
+  import ZKConfig._
+
   /** ZK host string */
-  val zkConnect = props.getString("zookeeper.connect")
+  val zkConnect = props.getString(ZkConnectProp)
 
   /** zookeeper session timeout */
-  val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
+  val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000)
 
   /** the max time that the client waits to establish a connection to zookeeper */
-  val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
+  val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp, zkSessionTimeoutMs)
 
   /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
+  val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
 }
 
 object ZkPath {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3db752a5/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 2bd093c..237d028 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -176,7 +176,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
               "--topic %(topic)s --consumer.config %(config_file)s" % args
 
         if self.new_consumer:
-            cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args
+            cmd += " --bootstrap-server %(broker_list)s" % args
         else:
             cmd += " --zookeeper %(zk_connect)s" % args
         if self.from_beginning:

http://git-wip-us.apache.org/repos/asf/kafka/blob/3db752a5/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index fdaf4c8..7c0601f 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -126,8 +126,6 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
             cmd += " --whitelist=\"%s\"" % self.whitelist
         if self.blacklist is not None:
             cmd += " --blacklist=\"%s\"" % self.blacklist
-        if self.new_consumer:
-            cmd += " --new.consumer"
 
         cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE)
         return cmd

http://git-wip-us.apache.org/repos/asf/kafka/blob/3db752a5/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index b0f99d7..cb661e3 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -106,7 +106,6 @@ class ConsumerPerformanceService(PerformanceService):
         }
 
         if self.new_consumer:
-            args['new-consumer'] = ""
             args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
         else:
             args['zookeeper'] = self.kafka.zk.connect_setting()


Mime
View raw message