kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-3101) Optimize Aggregation Outputs
Date Wed, 06 Apr 2016 21:24:25 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Guozhang Wang updated KAFKA-3101:
    Labels: semantics  (was: )

> Optimize Aggregation Outputs
> ----------------------------
>                 Key: KAFKA-3101
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3101
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: kafka streams
>            Reporter: Guozhang Wang
>              Labels: semantics
>             Fix For:
> Today we emit one output record for each incoming message for Table / Windowed Stream
Aggregations. For example, say we have a sequence of aggregate outputs computed from the input
stream (assuming there is no agg value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change<newValue, oldValue>:
> <V1, null>, <V2, V1>, <V3, V2>, <V4, V3>, <V5, V4>
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last emitted old value,
we can reduce the number of emits based on some configs. More specifically, we can add one
more field in the KV store engine storing the last emitted old value, which only get updated
when we emit to the downstream processor. For example:
> At Beginning:                 
> Store: key => empty (no agg values yet)
> V1 computed:         
> Update Both in Store: key => (V1, V1),     Emit <V1, null>
> V2 computed:         
> Update NewValue in Store: key => (V2, V1),     No Emit
> V3 computed:         
> Update NewValue in Store: key => (V3, V1),     No Emit
> V4 computed:         
> Update Both in Store: key => (V4, V4),     Emit <V4, V1>
> V5 computed:         
> Update NewValue in Store: key => (V5, V4),     No Emit
> One more thing to consider is that, we need a "closing" time control on the not-yet-emitted
keys; when some time has elapsed (or the window is to be closed), we need to check for any
key if their current materialized pairs have not been emitted (for example <V5, V4>
in the above example). 

This message was sent by Atlassian JIRA

View raw message