kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4612) Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B"
Date Tue, 10 Jan 2017 01:42:58 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813501#comment-15813501
] 

Matthias J. Sax commented on KAFKA-4612:
----------------------------------------

Did you configure global default key and value Serdes via {{StreamsConfig}} using parameters
{{KEY_SERDE_CLASS_CONFIG}} and {{VALUE_SERDE_CLASS_CONFIG}}? I guess it's not a library issue
in the strong sense but just a miss configuration. The problem is, that the data will be repartitioned
after {{.selectKey}} by writing the data to a topic, but it does not find the correct Serde
(we need to assume that the key type changes in {{.selectKey}} and thus fall back to global
default Serdes). Right now, {{.selectKey}} does not allow to specify a new key Serde (what
is kinda problem) -- a work around would be to put a {{.through}} after {{.selectKey}} as
it allows to specify the required Serdes.

Btw: I think, in your example you can omit {{.selectKey}} as it does not set a new key anyway.
(This change itself should actually fix the problem as repartitioning -- that is not necessarily
required in your case -- is avoided -- and even if it is required the library know that the
data type is String and can use the correct Serde automatically.)

Please verify. (Please do not close the JIRA even if my hints resolve the problem -- we might
want to add an overload for {{.selectKey}} that allows to specify a new key Serde.)


> Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast
to [B"
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4612
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4612
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: Virtual Machine using Debian 8 + Confluent Platform 3.1.1.
>            Reporter: Kurt Ostfeld
>         Attachments: KafkaIsolatedBug.tar.gz
>
>
> I've attached a minimal single source file project that reliably reproduces this issue.
> This project does the following:
> 1) Create test input data. Produces a single random (String,String) record into two diferent
topics "topicInput" and "topicTable"
> 2) Creates and runs a Kafka Streams application:
>     val kafkaTable: KTable[String, String] = builder.table(Serdes.String, Serdes.String,
"topicTable", "topicTable")
>     val incomingRecords: KStream[String, String] = builder.stream(Serdes.String, Serdes.String,
"topicInput")
>     val reKeyedRecords: KStream[String, String] = incomingRecords.selectKey((k, _) =>
k)
>     val joinedRecords: KStream[String, String] = reKeyedRecords.leftJoin(kafkaTable,
(s1: String, _: String) => s1)
>     joinedRecords.to(Serdes.String, Serdes.String, "topicOutput")
> This reliably generates the following error:
> [error] (StreamThread-1) java.lang.ClassCastException: java.lang.String cannot be cast
to [B
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> 	at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
> 	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63)
> 	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
> 	at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
> 	at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
> 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> One caveat: I'm running this on a Confluent Platform 3.1.1 instance which uses Kafka
0.10.1.0 since there is no newer Confluent Platform available. The Kafka Streams project is
built using "kafka-clients" and "kafka-streams" version 0.10.1.1. If I use 0.10.1.0, I reliably
hit bug https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is any issue
using 0.10.1.1 libraries with a Confluent Platform running Kafka 0.10.1.0. I will obviously
try the next Confluent Platform binary when it is available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message