From dev-return-95752-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Jul 3 16:21:37 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 311AE180632 for ; Tue, 3 Jul 2018 16:21:37 +0200 (CEST) Received: (qmail 9781 invoked by uid 500); 3 Jul 2018 14:21:35 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 9764 invoked by uid 99); 3 Jul 2018 14:21:35 -0000 Received: from ui-eu-01.ponee.io (HELO localhost) (176.9.59.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jul 2018 14:21:35 +0000 x-ponymail-sender: afa2421c8c3d7d179970b20f4428981e9cdef15b Subject: Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source Content-Type: text/plain; charset=utf-8 Message-ID: Date: Tue, 03 Jul 2018 14:21:33 -0000 References: <82bf868d-3a97-03d7-1f35-8e335a1a51b0@confluent.io> x-ponymail-agent: PonyMail Composer/0.3 In-Reply-To: To: MIME-Version: 1.0 X-Mailer: LuaSocket 3.0-rc1 From: flaviostutz@gmail.com Great feature you have there! I'll try to exercise here how we would achieve the same functional objectives using your KIP: EXERCISE 1: - The case is "total counting of events for a huge website" - Tasks from Application A will have something like: .stream(/site-events) .count() .publish(/single-partitioned-topic-with-count-partials) - The published messages will be, for example: ["counter-task1", 2345] ["counter-task2", 8495] ["counter-task3", 4839] - Single Task from Application B will have something like: .stream(/single-partitioned-topic-with-count-partials) .aggregate(by messages whose key starts with "counter") .publish(/counter-total) - FAIL HERE. How would I know what is the overall partitions? Maybe two partials for the same task will arrive before other tasks and it maybe aggregated twice. I tried to think about using GlobalKTables, but I didn't get an easy way to aggregate the keys from that table. Do you have any clue? Thanks. -Flávio Stutz /partial-counters-to-single-partitioned-topic On 2018/07/02 20:03:57, John Roesler wrote: > Hi Flávio, > > Thanks for the KIP. I'll apologize that I'm arriving late to the > discussion. I've tried to catch up, but I might have missed some nuances. > > Regarding KIP-328, the idea is to add the ability to suppress intermediate > results from all KTables, not just windowed ones. I think this could > support your use case in combination with the strategy that Guozhang > proposed of having one or more pre-aggregation steps that ultimately push > into a single-partition topic for final aggregation. Suppressing > intermediate results would solve the problem you noted that today > pre-aggregating doesn't do much to staunch the flow up updates. > > I'm not sure if this would be good enough for you overall; I just wanted to > clarify the role of KIP-328. > In particular, the solution you mentioned is to have the downstream KTables > actually query the upstream ones to compute their results. I'm not sure > whether it's more efficient to do these queries on the schedule, or to have > the upstream tables emit their results, on the same schedule. > > What do you think? > > Thanks, > -John > > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com > wrote: > > > For what I understood, that KIP is related to how KStreams will handle > > KTable updates in Windowed scenarios to optimize resource usage. > > I couldn't see any specific relation to this KIP. Had you? > > > > -Flávio Stutz > > > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" wrote: > > > Flavio, > > > > > > thanks for cleaning up the KIP number collision. > > > > > > With regard to KIP-328 > > > ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > > ) > > > I am wondering how both relate to each other? > > > > > > Any thoughts? > > > > > > > > > -Matthias > > > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote: > > > > Just copying a follow up from another thread to here (sorry about the > > mess): > > > > > > > > From: Guozhang Wang > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source > > > > Date: 2018/06/25 22:24:17 > > > > List: dev@kafka.apache.org > > > > > > > > Flávio, thanks for creating this KIP. > > > > > > > > I think this "single-aggregation" use case is common enough that we > > should > > > > consider how to efficiently supports it: for example, for KSQL that's > > built > > > > on top of Streams, we've seen lots of query statements whose return is > > > > expected a single row indicating the "total aggregate" etc. See > > > > https://github.com/confluentinc/ksql/issues/430 for details. > > > > > > > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, > > but > > > > I'm wondering if we have discussed the option of supporting it in a > > > > "pre-aggregate" manner: that is we do partial aggregates on parallel > > tasks, > > > > and then sends the partial aggregated value via a single topic > > partition > > > > for the final aggregate, to reduce the traffic on that single > > partition and > > > > hence the final aggregate workload. > > > > Of course, for non-commutative aggregates we'd probably need to provide > > > > another API in addition to aggregate, like the `merge` function for > > > > session-based aggregates, to let users customize the operations of > > merging > > > > two partial aggregates into a single partial aggregate. What's its > > pros and > > > > cons compared with the current proposal? > > > > > > > > > > > > Guozhang > > > > On 2018/06/26 18:22:27, Flávio Stutz wrote: > > > >> Hey, guys, I've just created a new KIP about creating a new DSL graph > > > >> source for realtime partitioned consolidations. > > > >> > > > >> We have faced the following scenario/problem in a lot of situations > > with > > > >> KStreams: > > > >> - Huge incoming data being processed by numerous application > > instances > > > >> - Need to aggregate different fields whose records span all topic > > > >> partitions (something like “total amount spent by people aged > 30 > > yrs” > > > >> when processing a topic partitioned by userid). > > > >> > > > >> The challenge here is to manage this kind of situation without any > > > >> bottlenecks. We don't need the “global aggregation” to be processed > > at each > > > >> incoming message. On a scenario of 500 instances, each handling 1k > > > >> messages/s, any single point of aggregation (single partitioned > > topics, > > > >> global tables or external databases) would create a bottleneck of 500k > > > >> messages/s for single threaded/CPU elements. > > > >> > > > >> For this scenario, it is possible to store the partial aggregations on > > > >> local stores and, from time to time, query those states and aggregate > > them > > > >> as a single value, avoiding bottlenecks. This is a way to create a > > "timed > > > >> aggregation barrier”. > > > >> > > > >> If we leverage this kind of built-in feature we could greatly enhance > > the > > > >> ability of KStreams to better handle the CAP Theorem characteristics, > > so > > > >> that one could choose to have Consistency over Availability when > > needed. > > > >> > > > >> We started this discussion with Matthias J. Sax here: > > > >> https://issues.apache.org/jira/browse/KAFKA-6953 > > > >> > > > >> If you want to see more, go to KIP-326 at: > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source > > > >> > > > >> -Flávio Stutz > > > >> > > > > > > > > >