kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3409: handle CommitFailedException in MirrorMaker
Date Wed, 23 Mar 2016 16:47:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7af67ce22 -> 20c313526


KAFKA-3409: handle CommitFailedException in MirrorMaker

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma, Ashish Singh, Guozhang Wang

Closes #1115 from hachikuji/KAFKA-3409


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20c31352
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20c31352
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20c31352

Branch: refs/heads/trunk
Commit: 20c313526a0518a51142d3abc5ee2a4d2ef3cb34
Parents: 7af67ce
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Mar 23 09:47:48 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 23 09:47:48 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/20c31352/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 26f4826..87f3cc5 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -25,14 +25,13 @@ import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.client.ClientUtils
 import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig,
ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
-import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
 import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
-import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer
+import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer,
CommitFailedException}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
 import org.apache.kafka.common.TopicPartition
@@ -356,6 +355,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           // and re-throw to break the loop
           mirrorMakerConsumer.commit()
           throw e
+
+        case e: CommitFailedException =>
+          warn("Failed to commit offsets because the consumer group has rebalanced and assigned
partitions to " +
+            "another instance. If you see this regularly, it could indicate that you need
to either increase " +
+            s"the consumer's ${consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce
the number of records " +
+            s"handled on each iteration with ${consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
       }
     } else {
       info("Exiting on send failure, skip committing offsets.")
@@ -422,10 +427,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         case t: Throwable =>
           fatal("Mirror maker thread failure due to ", t)
       } finally {
-        info("Flushing producer.")
-        producer.flush()
-        info("Committing consumer offsets.")
-        CoreUtils.swallow(commitOffsets(mirrorMakerConsumer))
+        CoreUtils.swallow {
+          info("Flushing producer.")
+          producer.flush()
+
+          // note that this commit is skipped if flush() fails which ensures that we don't
lose messages
+          info("Committing consumer offsets.")
+          commitOffsets(mirrorMakerConsumer)
+        }
+
         info("Shutting down consumer connectors.")
         CoreUtils.swallow(mirrorMakerConsumer.stop())
         CoreUtils.swallow(mirrorMakerConsumer.cleanup())


Mime
View raw message