kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3157: Mirror maker doesn't commit offset with low traffic
Date Fri, 05 Feb 2016 17:11:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 64d512959 -> 8f23db4c6


KAFKA-3157: Mirror maker doesn't commit offset with low traffic

Mirror maker doesn't commit offset with new consumer enabled when data volume is low. This
is caused by infinite loop in ```receive()``` which would never jump out of loop if no data
coming

Author: Tao Xiao <xiaotao183@gmail.com>

Reviewers: Ismael Juma, Jason Gustafson

Closes #821 from xiaotao183/KAFKA-3157

(cherry picked from commit d6e36df8737aa2b898b4fd0a81c2d94f3c349b68)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f23db4c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f23db4c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f23db4c

Branch: refs/heads/0.9.0
Commit: 8f23db4c6ab24ae6d0c7ecabe42cd6f2d4edb089
Parents: 64d5129
Author: Tao Xiao <xiaotao183@gmail.com>
Authored: Fri Feb 5 09:11:24 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 5 09:11:32 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f23db4c/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b093cb2..a285ac5 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -519,12 +519,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener)
     }
 
-    // New consumer always hasNext
     override def hasData = true
 
     override def receive() : BaseConsumerRecord = {
-      while (recordIter == null || !recordIter.hasNext)
+      if (recordIter == null || !recordIter.hasNext) {
         recordIter = consumer.poll(1000).iterator
+        if (!recordIter.hasNext)
+          throw new ConsumerTimeoutException
+      }
 
       val record = recordIter.next()
       val tp = new TopicPartition(record.topic, record.partition)


Mime
View raw message