From dev-return-107470-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Sep 13 07:47:05 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9FFB2180652 for ; Fri, 13 Sep 2019 09:47:05 +0200 (CEST) Received: (qmail 49640 invoked by uid 500); 13 Sep 2019 07:47:02 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 49489 invoked by uid 99); 13 Sep 2019 07:47:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2019 07:47:01 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id E7DB3E3132 for ; Fri, 13 Sep 2019 07:47:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 1FF9D780395 for ; Fri, 13 Sep 2019 07:47:00 +0000 (UTC) Date: Fri, 13 Sep 2019 07:47:00 +0000 (UTC) From: "Eduard Wirch (Jira)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (KAFKA-8905) Stream DSL: tasks should take serdes from upstream tasks MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Eduard Wirch created KAFKA-8905: ----------------------------------- Summary: Stream DSL: tasks should take serdes from upstream ta= sks Key: KAFKA-8905 URL: https://issues.apache.org/jira/browse/KAFKA-8905 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0 Reporter: Eduard Wirch {code:java} final Properties props =3D new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); final StreamsBuilder builder =3D new StreamsBuilder(); final KStream source =3D builder.stream( "streams-plaintext-input", Consumed.with(Serdes.String(), Serdes.String()) ); final KTable counts =3D source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDef= ault()).split(" "))) .groupBy( (key, value) -> value ) .count(); // need to override value serde to Long type counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.Strin= g(), Serdes.Long())); final KafkaStreams streams =3D new KafkaStreams(builder.build(), props);{co= de} Original code taken from code sample=C2=A0[https://github.com/apache/kafka/= blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/w= ordcount/WordCountDemo.java] I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and {{DEFAULT_VALUE_SERDE_= CLASS_CONFIG}} settings to make my point clear. This application will fail: {code:java} Caused by: java.lang.ClassCastException: java.lang.String incompatible with= [BCaused by: java.lang.ClassCastException: java.lang.String incompatible w= ith [B at org.apache.kafka.common.serialization.ByteArraySerializer.seriali= ze(ByteArraySerializer.java:19) at org.apache.kafka.common.serialization.Se= rializer.serialize(Serializer.java:62) at org.apache.kafka.streams.processo= r.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161) at org.a= pache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordColl= ectorImpl.java:102) at org.apache.kafka.streams.processor.internals.SinkNod= e.process(SinkNode.java:89) {code} Adjusting this part of the code: {code:java} .groupBy( (key, value) -> value, Grouped.with(Serdes.String(), Serdes.String()) ) {code} Will make the application run properly.=C2=A0 This explicit serde specification is unnecessarily, since the serde are alr= eady known from upstream source task. Relying on default serde works in thi= s simple example, but fails for more complex scenarios. Please make the DSL more usable by taking the serde configuration from upst= ream tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)