Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4368C200B38 for ; Fri, 8 Jul 2016 18:59:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 422A1160A36; Fri, 8 Jul 2016 16:59:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 94943160A78 for ; Fri, 8 Jul 2016 18:59:12 +0200 (CEST) Received: (qmail 67067 invoked by uid 500); 8 Jul 2016 16:59:11 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 66977 invoked by uid 99); 8 Jul 2016 16:59:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 16:59:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 332212C02AF for ; Fri, 8 Jul 2016 16:59:11 +0000 (UTC) Date: Fri, 8 Jul 2016 16:59:11 +0000 (UTC) From: "Eno Thereska (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 08 Jul 2016 16:59:13 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367982#comment-15367982 ] Eno Thereska commented on KAFKA-3101: ------------------------------------- [~bbejeck] Could you provide a bit more context on why flatbuffers are needed for the comparison? Thanks. > Optimize Aggregation Outputs > ---------------------------- > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / Windowed Stream Aggregations. For example, say we have a sequence of aggregate outputs computed from the input stream (assuming there is no agg value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of Change: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last emitted old value, we can reduce the number of emits based on some configs. More specifically, we can add one more field in the KV store engine storing the last emitted old value, which only get updated when we emit to the downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the not-yet-emitted keys; when some time has elapsed (or the window is to be closed), we need to check for any key if their current materialized pairs have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)