kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1237 Follow up review suggestions on new mirror maker; reviewed by Guozhang Wang
Date Wed, 12 Feb 2014 03:29:48 GMT
Updated Branches:
  refs/heads/trunk d6303ec79 -> cdd03d199


KAFKA-1237 Follow up review suggestions on new mirror maker; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cdd03d19
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cdd03d19
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cdd03d19

Branch: refs/heads/trunk
Commit: cdd03d19943d42b6401b255196c83ee32ef6c792
Parents: d6303ec
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue Feb 11 19:29:35 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Feb 11 19:29:35 2014 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/newproducer/MirrorMaker.scala     | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cdd03d19/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
index faa07e9..0d6d702 100644
--- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
@@ -37,14 +37,13 @@ object MirrorMaker extends Logging {
     val parser = new OptionParser
 
     val consumerConfigOpt = parser.accepts("consumer.config",
-      "Consumer config to consume from a source cluster. " +
-      "You may specify multiple of these.")
+      "Consumer config file to consume from a source cluster.")
       .withRequiredArg()
       .describedAs("config file")
       .ofType(classOf[String])
 
     val producerConfigOpt = parser.accepts("producer.config",
-      "Embedded producer config.")
+      "Embedded producer config file for target cluster.")
       .withRequiredArg()
       .describedAs("config file")
       .ofType(classOf[String])
@@ -158,7 +157,7 @@ object MirrorMaker extends Logging {
 
   class ProducerDataChannel extends Logging {
     val producers = new ListBuffer[KafkaProducer]
-    var producerIndex = 0
+    var producerIndex = new AtomicInteger(0)
 
     def addProducer(producer: KafkaProducer) {
       producers += producer
@@ -171,8 +170,9 @@ object MirrorMaker extends Logging {
         val producer = producers(producerId)
         producer.send(producerRecord)
       } else {
-        producers(producerIndex).send(producerRecord)
-        producerIndex = (producerIndex + 1) % producers.size
+        val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size)
+        producers(producerId).send(producerRecord)
+        trace("Sent message to producer " + producerId)
       }
     }
 


Mime
View raw message