kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6422 Mirror maker will throw null pointer exception when the message value is null (#4387)
Date Mon, 08 Jan 2018 23:04:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 4e22c2c  KAFKA-6422 Mirror maker will throw null pointer exception when the message
value is null (#4387)
4e22c2c is described below

commit 4e22c2cabb98bc055cb1739f6946576f67747771
Author: lisa2lisa <lisa2lisa@users.noreply.github.com>
AuthorDate: Mon Jan 8 23:59:16 2018 +0100

    KAFKA-6422 Mirror maker will throw null pointer exception when the message value is null
(#4387)
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, James Cheng <jylcheng@yahoo.com>,
Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b5b1540..66b8005 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -67,7 +67,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private[tools] var producer: MirrorMakerProducer = null
   private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
-  private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
+  private val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
   // Track the messages not successfully sent by mirror maker.
   private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
   private var messageHandler: MirrorMakerMessageHandler = null
@@ -384,7 +384,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   def cleanShutdown() {
-    if (isShuttingdown.compareAndSet(false, true)) {
+    if (isShuttingDown.compareAndSet(false, true)) {
       info("Start clean shutdown.")
       // Shutdown consumer threads.
       info("Shutting down consumer threads.")
@@ -426,7 +426,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           try {
             while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData)
{
               val data = mirrorMakerConsumer.receive()
-              trace("Sending message with value size %d and offset %d".format(data.value.length,
data.offset))
+              if (data.value != null) {
+                trace("Sending message with value size %d and offset %d.".format(data.value.length,
data.offset))
+              } else {
+                trace("Sending message with null value and offset %d.".format(data.offset))
+              }
               val records = messageHandler.handle(data)
               records.asScala.foreach(producer.send)
               maybeFlushAndCommitOffsets()
@@ -459,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         shutdownLatch.countDown()
         info("Mirror maker thread stopped")
         // if it exits accidentally, stop the entire mirror maker
-        if (!isShuttingdown.get()) {
+        if (!isShuttingDown.get()) {
           fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
           sys.exit(-1)
         }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message