From dev-return-95693-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Jul 2 04:16:43 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3E64F180654 for ; Mon, 2 Jul 2018 04:16:43 +0200 (CEST) Received: (qmail 21788 invoked by uid 500); 2 Jul 2018 02:16:41 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 21777 invoked by uid 99); 2 Jul 2018 02:16:41 -0000 Received: from ui-eu-01.ponee.io (HELO localhost) (176.9.59.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2018 02:16:41 +0000 Date: Mon, 02 Jul 2018 02:16:39 -0000 x-ponymail-sender: afa2421c8c3d7d179970b20f4428981e9cdef15b Message-ID: Subject: Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source In-Reply-To: MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 x-ponymail-agent: PonyMail Composer/0.3 X-Mailer: LuaSocket 3.0-rc1 References: To: From: flaviostutz@gmail.com Cons: We tried the "single partition" strategy, but the problem is that for each incoming message to the Graph, we have another output message with the aggregated (cummulative or not) result, so that if we have a million messages/s (among all parallel tasks) being processed, we'll have another million message/s of aggregated values on the single partitioned topic, so that it will not be possible for a single consumer to handle this load. Did I miss something? Pros: Using a Schedulable Stream Source from state store's values, we can control the pressure of global aggregation that - despite of the strategy applied - have to be performed by a single instance. In this way, the pressure will be determined by the desired schedule time (that can be lowered as needed) and the number of tuples in the state store instances (the fewer different keys on state store instances the higher pratical frequency of global aggregation task). Using this strategy, the load of incoming messages won't affect the global aggregation mechanism, because we are spliting them apart temporarly (distributed partial aggregation among partition tasks happens during the incoming messages workload; and global final aggregation is performed by a single task from time to time, assynchronously). [SOURCE PARTITION 1] >-----100k messages/s----> [TASK GRAPH 1] >----100 different aggregated values----> [INSTANCE 1 STATE STORE] >---> END [SOURCE PARTITION 2] >-----100k messages/s----> [TASK GRAPH 2] >----200 different aggregated values----> [INSTANCE 2 STATE STORE] >---> END [SOURCE PARTITION 3] >-----100k messages/s----> [TASK GRAPH 3] >----100 different aggregated values----> [INSTANCE 3 STATE STORE] >---> END [ONE SEC SCHEDULED SOURCE] >-----400 messages/s------> [TASK GRAPH X] >----global aggregated value----> [OUTPUT TOPIC /global-total] (one second scheduled KTable with all state stores instances's tuples) What do you think? -Flávio Stutz On 2018/06/29 17:23:29, flaviostutz@gmail.com wrote: > Just copying a follow up from another thread to here (sorry about the mess): > > From: Guozhang Wang > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source > Date: 2018/06/25 22:24:17 > List: dev@kafka.apache.org > > Flávio, thanks for creating this KIP. > > I think this "single-aggregation" use case is common enough that we should > consider how to efficiently supports it: for example, for KSQL that's built > on top of Streams, we've seen lots of query statements whose return is > expected a single row indicating the "total aggregate" etc. See > https://github.com/confluentinc/ksql/issues/430 for details. > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but > I'm wondering if we have discussed the option of supporting it in a > "pre-aggregate" manner: that is we do partial aggregates on parallel tasks, > and then sends the partial aggregated value via a single topic partition > for the final aggregate, to reduce the traffic on that single partition and > hence the final aggregate workload. > Of course, for non-commutative aggregates we'd probably need to provide > another API in addition to aggregate, like the `merge` function for > session-based aggregates, to let users customize the operations of merging > two partial aggregates into a single partial aggregate. What's its pros and > cons compared with the current proposal? > > > Guozhang > On 2018/06/26 18:22:27, Flávio Stutz wrote: > > Hey, guys, I've just created a new KIP about creating a new DSL graph > > source for realtime partitioned consolidations. > > > > We have faced the following scenario/problem in a lot of situations with > > KStreams: > > - Huge incoming data being processed by numerous application instances > > - Need to aggregate different fields whose records span all topic > > partitions (something like “total amount spent by people aged > 30 yrs” > > when processing a topic partitioned by userid). > > > > The challenge here is to manage this kind of situation without any > > bottlenecks. We don't need the “global aggregation” to be processed at each > > incoming message. On a scenario of 500 instances, each handling 1k > > messages/s, any single point of aggregation (single partitioned topics, > > global tables or external databases) would create a bottleneck of 500k > > messages/s for single threaded/CPU elements. > > > > For this scenario, it is possible to store the partial aggregations on > > local stores and, from time to time, query those states and aggregate them > > as a single value, avoiding bottlenecks. This is a way to create a "timed > > aggregation barrier”. > > > > If we leverage this kind of built-in feature we could greatly enhance the > > ability of KStreams to better handle the CAP Theorem characteristics, so > > that one could choose to have Consistency over Availability when needed. > > > > We started this discussion with Matthias J. Sax here: > > https://issues.apache.org/jira/browse/KAFKA-6953 > > > > If you want to see more, go to KIP-326 at: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source > > > > -Flávio Stutz > > >