kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mykola Polonskyi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4270) ClassCast for Agregation
Date Sat, 19 Nov 2016 09:53:58 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15678967#comment-15678967
] 

Mykola Polonskyi commented on KAFKA-4270:
-----------------------------------------

Hello [~damianguy] 
I tried to do remappring with doAggragete.
{code}
cardId -> card(val userId, val cardId)
{code}
to 
{code}
userId -> card(val userId, val cardId)
{code}
and then add adding to user(val userId, val setOfCards) card that was agregated. Looks like
relation one-to-many I think. 

> ClassCast for Agregation
> ------------------------
>
>                 Key: KAFKA-4270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4270
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Mykola Polonskyi
>            Assignee: Damian Guy
>            Priority: Critical
>              Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the ClassCastException:
from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)

> Instead defined serde inside aggregation call is used default Impl with empty plugs instead
of implementations 
> {code:koltin} 
> userTable.join(
>             skicardsTable.groupBy { key, value -> KeyValue(value.skicardInfo.ownerId,
value.skicardInfo) }
>                     .aggregate(
>                             { mutableSetOf<SkicardInfo>() }, 
>                             { ownerId, skicardInfo, accumulator -> accumulator.put(skicardInfo)
},
>                             { ownerId, skicardInfo, accumulator -> accumulator },

>                             skicardByOwnerIdSerde,
>                             skicardByOwnerIdTopicName
>                     ),
>             { userCreatedOrUpdated, skicardInfoSet -> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user,
skicardInfoSet) }
>     ).to(
>             userWithSkicardsTable
>     )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting up should
be changed because that is incorrect in release 0.10.0.1-cp1 to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message