kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4073; MirrorMaker should handle messages without timestamp correctly
Date Tue, 23 Aug 2016 04:50:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 8bf9addd2 -> 4e4e2fb50


KAFKA-4073; MirrorMaker should handle messages without timestamp correctly

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1773 from ijuma/kafka-4073-mirror-maker-timestamps

(cherry picked from commit a1e0b2240dba0740135621d959441eefa6fd3124)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e4e2fb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e4e2fb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e4e2fb5

Branch: refs/heads/0.10.0
Commit: 4e4e2fb5085758ee9ccf6307433ad531a33198d3
Parents: 8bf9add
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Aug 22 21:49:40 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Aug 22 21:50:25 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala |  4 +++-
 .../scala/unit/kafka/tools/MirrorMakerTest.scala  | 18 +++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e4e2fb5/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 7d6b5fb..5de2038 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.record.Record
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
@@ -675,7 +676,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler
{
     override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte],
Array[Byte]]] = {
-      Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic,
null, record.timestamp, record.key, record.value))
+      val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else
record.timestamp
+      Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic,
null, timestamp, record.key, record.value))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e4e2fb5/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
index 39a0ac9..d6a5470 100644
--- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import kafka.consumer.BaseConsumerRecord
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{Record, TimestampType}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -39,4 +39,20 @@ class MirrorMakerTest {
     assertEquals("key", new String(producerRecord.key))
     assertEquals("value", new String(producerRecord.value))
   }
+
+  @Test
+  def testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage() {
+    val consumerRecord = BaseConsumerRecord("topic", 0, 1L, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME,
"key".getBytes, "value".getBytes)
+
+    val result = MirrorMaker.defaultMirrorMakerMessageHandler.handle(consumerRecord)
+    assertEquals(1, result.size)
+
+    val producerRecord = result.get(0)
+    assertNull(producerRecord.timestamp)
+    assertEquals("topic", producerRecord.topic)
+    assertNull(producerRecord.partition)
+    assertEquals("key", new String(producerRecord.key))
+    assertEquals("value", new String(producerRecord.value))
+  }
+
 }


Mime
View raw message