kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations
Date Fri, 01 Jul 2016 16:39:11 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359231#comment-15359231

Guozhang Wang commented on KAFKA-3429:

I think this ticket should be re-defined a bit for KTable.groupBy: 

Today if user does not provide a serde when calling this operator, the default serde from
the configs are used. But for a special case where the KTable was created directly from an
aggregate operator, we can use the key-serdes specified in the previous aggregate operator
instead of the default config in configs, and if there is no {{mapValues}} in between, we
can also use the value-serdes in the previous aggregate operator as well.

And it now seems be quite a nitch optimization.

> Remove Serdes needed for repartitioning in KTable stateful operations
> ---------------------------------------------------------------------
>                 Key: KAFKA-3429
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3429
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Matthias J. Sax
>              Labels: api, newbie++
>             Fix For:
> Currently in KTable aggregate operations where a repartition is possibly needed since
the aggregation key may not be the same as the original primary key, we require the users
to provide serdes (default to configured ones) for read / write to the internally created
re-partition topic. However, these are not necessary since for all KTable instances either
generated from the topics directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the same serde
can be re-used when the resulted KTable is involved in future aggregation operations. For
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in table1.aggregate
since it could always reuse the "serde" from stream.aggregateByKey, which is used to materialize
the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde information
along with the KTableImpl instance in order to re-use it in a future operation that requires

This message was sent by Atlassian JIRA

View raw message