From dev-return-95694-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Jul 2 04:46:27 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 955FF180654 for ; Mon, 2 Jul 2018 04:46:26 +0200 (CEST) Received: (qmail 44666 invoked by uid 500); 2 Jul 2018 02:46:20 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 44655 invoked by uid 99); 2 Jul 2018 02:46:19 -0000 Received: from ui-eu-01.ponee.io (HELO localhost) (176.9.59.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2018 02:46:19 +0000 References: <82bf868d-3a97-03d7-1f35-8e335a1a51b0@confluent.io> To: In-Reply-To: Content-Type: text/plain; charset=utf-8 x-ponymail-agent: PonyMail Composer/0.3 X-Mailer: LuaSocket 3.0-rc1 From: flaviostutz@gmail.com MIME-Version: 1.0 x-ponymail-sender: afa2421c8c3d7d179970b20f4428981e9cdef15b Date: Mon, 02 Jul 2018 02:46:18 -0000 Message-ID: Subject: Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source > I agree with Guozhang on comparing the pros and cons of the approach he > outlined vs the one in the proposed KIP. I've just replied him. Please take a look. > Will the triggering mechanism always be time, or would it make sense to > expand to use other mechanisms such as the number of records, or some value > present in one of the records? Until now, elapsed time solved all the uses cases that confronted us. Maybe there are others which would turn those trigering mechanisms useful. Do you have any example? > When setting "all instances" to true, how is the leader chosen? In our implementation based on punctuation and API exposition through REST, we did the following: - during punctuate, each instance would query the KStreams API to discover its own instance id and the ids from all active instances - the instance with the lowest id will be the leader - this way, if the leader gets down, another instance would take place > If "all instances" is set to false are all the partial aggregates forwarded > to single output topic? No. If it is false, it is expected that the sourced KTable will have only partial aggregates from the current instance's state store. This would be useful when there are tens of thousands of Keys on the partial tables (making it slow for a single instance to aggregate all instances states) so that one could implement a hierarchical partial aggregation, mainly in cases where there are many instances to handle the load. Something like: - each instance, from time to time, gets its own partial aggregated value (through the proposed mechanism) and publishes its value to a GlobalKTable (say, "aggregation-level0"), appending its instance id to the Key (something like "[instance-id]-[ktable-key]") - another graph, from time to time, gets a shard of K,V (say, a third of those instance-id keys) and aggregates them in another GlobalKTable ("aggregation-level1") with keys like "[shard-id]-[ktable-key]" - depending on the quantity of keys, this would take a couple of levels for distributing the "reduce" job in parallel. The described "hierarchical reduce" mechanism may be another KIP in the future, with automatic sharding and even auto leveling based on load. Thanks for the reply! -Flávio Stutz On 2018/06/29 19:53:41, Bill Bejeck wrote: > Hi Flávio, > > Thanks for creating the KIP. > > I agree with Guozhang on comparing the pros and cons of the approach he > outlined vs the one in the proposed KIP. > > I also have a few clarification questions on the current KIP > > Will the triggering mechanism always be time, or would it make sense to > expand to use other mechanisms such as the number of records, or some value > present in one of the records? > When setting "all instances" to true, how is the leader chosen? > If "all instances" is set to false are all the partial aggregates forwarded > to single output topic? > > Thanks again, > Bill > > > On Fri, Jun 29, 2018 at 2:15 PM Matthias J. Sax > wrote: > > > Flavio, > > > > thanks for cleaning up the KIP number collision. > > > > With regard to KIP-328 > > ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > > ) > > I am wondering how both relate to each other? > > > > Any thoughts? > > > > > > -Matthias > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote: > > > Just copying a follow up from another thread to here (sorry about the > > mess): > > > > > > From: Guozhang Wang > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source > > > Date: 2018/06/25 22:24:17 > > > List: dev@kafka.apache.org > > > > > > Flávio, thanks for creating this KIP. > > > > > > I think this "single-aggregation" use case is common enough that we > > should > > > consider how to efficiently supports it: for example, for KSQL that's > > built > > > on top of Streams, we've seen lots of query statements whose return is > > > expected a single row indicating the "total aggregate" etc. See > > > https://github.com/confluentinc/ksql/issues/430 for details. > > > > > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, > > but > > > I'm wondering if we have discussed the option of supporting it in a > > > "pre-aggregate" manner: that is we do partial aggregates on parallel > > tasks, > > > and then sends the partial aggregated value via a single topic partition > > > for the final aggregate, to reduce the traffic on that single partition > > and > > > hence the final aggregate workload. > > > Of course, for non-commutative aggregates we'd probably need to provide > > > another API in addition to aggregate, like the `merge` function for > > > session-based aggregates, to let users customize the operations of > > merging > > > two partial aggregates into a single partial aggregate. What's its pros > > and > > > cons compared with the current proposal? > > > > > > > > > Guozhang > > > On 2018/06/26 18:22:27, Flávio Stutz wrote: > > >> Hey, guys, I've just created a new KIP about creating a new DSL graph > > >> source for realtime partitioned consolidations. > > >> > > >> We have faced the following scenario/problem in a lot of situations with > > >> KStreams: > > >> - Huge incoming data being processed by numerous application > > instances > > >> - Need to aggregate different fields whose records span all topic > > >> partitions (something like “total amount spent by people aged > 30 yrs” > > >> when processing a topic partitioned by userid). > > >> > > >> The challenge here is to manage this kind of situation without any > > >> bottlenecks. We don't need the “global aggregation” to be processed at > > each > > >> incoming message. On a scenario of 500 instances, each handling 1k > > >> messages/s, any single point of aggregation (single partitioned topics, > > >> global tables or external databases) would create a bottleneck of 500k > > >> messages/s for single threaded/CPU elements. > > >> > > >> For this scenario, it is possible to store the partial aggregations on > > >> local stores and, from time to time, query those states and aggregate > > them > > >> as a single value, avoiding bottlenecks. This is a way to create a > > "timed > > >> aggregation barrier”. > > >> > > >> If we leverage this kind of built-in feature we could greatly enhance > > the > > >> ability of KStreams to better handle the CAP Theorem characteristics, so > > >> that one could choose to have Consistency over Availability when needed. > > >> > > >> We started this discussion with Matthias J. Sax here: > > >> https://issues.apache.org/jira/browse/KAFKA-6953 > > >> > > >> If you want to see more, go to KIP-326 at: > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source > > >> > > >> -Flávio Stutz > > >> > > > > >