kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From flaviostutz@gmail.com <flaviost...@gmail.com>
Subject Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source
Date Mon, 02 Jul 2018 02:16:39 GMT
We tried the "single partition" strategy, but the problem is that for each incoming message
to the Graph, we have another output message with the aggregated (cummulative or not) result,
so that if we have a million messages/s (among all parallel tasks) being processed, we'll
have another million message/s of aggregated values on the single partitioned topic, so that
it will not be possible for a single consumer to handle this load. Did I miss something?

Using a Schedulable Stream Source from state store's values, we can control the pressure of
global aggregation that - despite of the strategy applied - have to be performed by a single
instance. In this way, the pressure will be determined by the desired schedule time (that
can be lowered as needed) and the number of tuples in the state store instances (the fewer
different keys on state store instances the higher pratical frequency of global aggregation
task). Using this strategy, the load of incoming messages won't affect the global aggregation
mechanism, because we are spliting them apart temporarly (distributed partial aggregation
among partition tasks happens during the incoming messages workload; and global final aggregation
is performed by a single task from time to time, assynchronously).

[SOURCE PARTITION 1]  >-----100k messages/s---->  [TASK GRAPH 1]  >----100 different
aggregated values---->  [INSTANCE 1 STATE STORE] >---> END
[SOURCE PARTITION 2]  >-----100k messages/s---->  [TASK GRAPH 2]  >----200 different
aggregated values---->  [INSTANCE 2 STATE STORE] >---> END
[SOURCE PARTITION 3]  >-----100k messages/s---->  [TASK GRAPH 3]  >----100 different
aggregated values---->  [INSTANCE 3 STATE STORE] >---> END

[ONE SEC SCHEDULED SOURCE] >-----400 messages/s------> [TASK GRAPH X]  >----global
aggregated value----> [OUTPUT TOPIC /global-total]
(one second scheduled KTable with all state stores instances's tuples)

What do you think?

-Flávio Stutz

On 2018/06/29 17:23:29, flaviostutz@gmail.com <flaviostutz@gmail.com> wrote: 
> Just copying a follow up from another thread to here (sorry about the mess):
> From: Guozhang Wang <wangguoz@gmail.com>
> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> Date: 2018/06/25 22:24:17
> List: dev@kafka.apache.org
> Flávio, thanks for creating this KIP.
> I think this "single-aggregation" use case is common enough that we should
> consider how to efficiently supports it: for example, for KSQL that's built
> on top of Streams, we've seen lots of query statements whose return is
> expected a single row indicating the "total aggregate" etc. See
> https://github.com/confluentinc/ksql/issues/430 for details.
> I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
> I'm wondering if we have discussed the option of supporting it in a
> "pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
> and then sends the partial aggregated value via a single topic partition
> for the final aggregate, to reduce the traffic on that single partition and
> hence the final aggregate workload.
> Of course, for non-commutative aggregates we'd probably need to provide
> another API in addition to aggregate, like the `merge` function for
> session-based aggregates, to let users customize the operations of merging
> two partial aggregates into a single partial aggregate. What's its pros and
> cons compared with the current proposal?
> Guozhang
> On 2018/06/26 18:22:27, Flávio Stutz <flaviostutz@gmail.com> wrote: 
> > Hey, guys, I've just created a new KIP about creating a new DSL graph
> > source for realtime partitioned consolidations.
> > 
> > We have faced the following scenario/problem in a lot of situations with
> > KStreams:
> >    - Huge incoming data being processed by numerous application instances
> >    - Need to aggregate different fields whose records span all topic
> > partitions (something like “total amount spent by people aged > 30 yrs”
> > when processing a topic partitioned by userid).
> > 
> > The challenge here is to manage this kind of situation without any
> > bottlenecks. We don't need the “global aggregation” to be processed at each
> > incoming message. On a scenario of 500 instances, each handling 1k
> > messages/s, any single point of aggregation (single partitioned topics,
> > global tables or external databases) would create a bottleneck of 500k
> > messages/s for single threaded/CPU elements.
> > 
> > For this scenario, it is possible to store the partial aggregations on
> > local stores and, from time to time, query those states and aggregate them
> > as a single value, avoiding bottlenecks. This is a way to create a "timed
> > aggregation barrier”.
> > 
> > If we leverage this kind of built-in feature we could greatly enhance the
> > ability of KStreams to better handle the CAP Theorem characteristics, so
> > that one could choose to have Consistency over Availability when needed.
> > 
> > We started this discussion with Matthias J. Sax here:
> > https://issues.apache.org/jira/browse/KAFKA-6953
> > 
> > If you want to see more, go to KIP-326 at:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > 
> > -Flávio Stutz
> > 

View raw message