kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5278: ConsoleConsumer should honor `--value-deserializer`
Date Tue, 30 May 2017 07:12:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a8794d8a5 -> 6f5930d63


KAFKA-5278: ConsoleConsumer should honor  `--value-deserializer`

In the original implementation, console-consumer fails to honor `--value-deserializer` config.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3100 from amethystic/KAFKA-5278


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f5930d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f5930d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f5930d6

Branch: refs/heads/trunk
Commit: 6f5930d631a7d3fc090cb81e5eb9cd69580f142b
Parents: a8794d8
Author: amethystic <huxi_2b@hotmail.com>
Authored: Tue May 30 00:12:54 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 30 00:12:54 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f5930d6/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 6d27e85..8a41386 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -198,8 +198,8 @@ object ConsoleConsumer extends Logging {
     props.putAll(config.extraConsumerProps)
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer !=
null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if (config.valueDeserializer
!= null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
 
     props
   }
@@ -315,6 +315,13 @@ object ConsoleConsumer extends Logging {
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+
+    if (keyDeserializer != null && !keyDeserializer.isEmpty) {
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
+    }
+    if (valueDeserializer != null && !valueDeserializer.isEmpty) {
+      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
+    }
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {


Mime
View raw message