kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KTable.filter usage, memory consumption and materialized view semantics
Date Thu, 23 Jun 2016 22:13:42 GMT
Hello Philippe,

I think your question is really in two-folds:

1. What is the semantic difference between a KTable and a KStream, and more
specifically how should we interpret (key, null) in KTable?

You can find some explanations in this documentation:

Note that KTable itself is still a stream behind the scene, although it may
be materialized when necessary. And specifically to your question, (key,
null) can be treated as a tombstone on the specified key, and when this
KTable stream is materialized, it will result in a "delete" on materialized

2. As for the "filter" operator, yes it will generate a large amount of
(key, null) records which indicates "delete" in the resulted KTable, and
hence large traffic to the piped topic. But we are working on KIP-63 which
unifies the caching mechanism in the `KTable.to` operator as well so that
de-duping can be done in this operator and hence the outgoing traffic can
be largely reduced:



On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <phderome@gmail.com> wrote:

> I made a modification of latest Confluent's example
> UserRegionLambdaExample. See relevant code at end of email.
> Am I correct in understanding that KTable semantics should be similar to a
> store-backed cache of a view as (per wikipedia on materialized views) or
> similar to Oracle's materialized views and indexed views? More
> specifically, I am looking at when a (key, null value) pair can make it
> into KTable on generating table from a valid KStream with a false filter.
> Here's relevant code modified from example for which I observed that all
> keys within userRegions are sent out to topic LargeRegions with a null
> value. I would think that both regionCounts KTable and topic LargeRegions
> should be empty so that the cached view agrees with the intended query (a
> query with an intentional empty result set as the filter is intentionally
> false as 1 >= 2).
> I am not sure I understand implications properly as I am new but it seems
> possible that  a highly selective filter from a large incoming stream would
> result in high memory usage for regionCounts and hence the stream
> application.
> KTable<String, *String*> regionCounts = userRegions
>     // Count by region
>     // We do not need to specify any explicit serdes because the key
> and value types do not change
>     .groupBy((userId, region) -> KeyValue.pair(region, region))
>     .count("CountsByRegion")
>     // discard any regions FOR SAKE OF EXAMPLE
>     .filter((regionName, count) -> *1 >= 2*)
>     .mapValues(count -> count.toString());
> KStream<String, *String*> regionCountsForConsole = regionCounts.toStream();
> regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions");

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message