kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eduard Wirch (Jira)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8905) Stream DSL: tasks should take serdes from upstream tasks
Date Fri, 13 Sep 2019 07:47:00 GMT
Eduard Wirch created KAFKA-8905:
-----------------------------------

             Summary: Stream DSL: tasks should take serdes from upstream tasks
                 Key: KAFKA-8905
                 URL: https://issues.apache.org/jira/browse/KAFKA-8905
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Eduard Wirch


{code:java}
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> source = builder.stream(
      "streams-plaintext-input",
      Consumed.with(Serdes.String(), Serdes.String())
);

final KTable<String, Long> counts = source
      .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
")))
      .groupBy(
            (key, value) -> value
      )
      .count();

// need to override value serde to Long type
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code}
Original code taken from code sample [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java]

I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}}
settings to make my point clear. This application will fail:
{code:java}
Caused by: java.lang.ClassCastException: java.lang.String incompatible with [BCaused by: java.lang.ClassCastException:
java.lang.String incompatible with [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) {code}
Adjusting this part of the code:
{code:java}
.groupBy(
      (key, value) -> value,
      Grouped.with(Serdes.String(), Serdes.String())
) {code}
Will make the application run properly. 

This explicit serde specification is unnecessarily, since the serde are already known from
upstream source task. Relying on default serde works in this simple example, but fails for
more complex scenarios.

Please make the DSL more usable by taking the serde configuration from upstream tasks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Mime
View raw message