kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <...@confluent.io>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Mon, 06 Jun 2016 15:49:08 GMT
For the threads, I think it might be reasonable to just give each thread
cache_bytes/num_threads for simplicity? Presumably you wouldn't want to
have to make the cache threadsafe so you'd want to avoid sharing? Giving
each thread the same cache isn't quite right, as you point out, but
depending on how random task assignment is might be okay? This type of
randomization actually seems important from a CPU load balancing
point-of-view too (i.e. you don't want all the tasks for some sub-topology
going to the same cpu core).

For Instrumentation.getObjectSize, does it really give the size of the
referenced object graph or is it more like size_of in C? Is it fast enough
to use for this kind of thing (seems like computing the "deep" object size
could be easy to overwhelm the cost of the hash table lookup)?

-Jay



On Sun, Jun 5, 2016 at 12:44 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> There are some details needed to be figured out if we go global:
>
> A KafkaStreams instance could have M threads, and each thread could various
> number (let's say N, but in practice it may be different from thread to
> thread) tasks, and each task contains a sub-topology with P caches (again
> in practice it may be different depending on which sub-topology it
> contains).
>
> Say if user specified X Gb for this KafkaStreams instance, then each cache
> will get X / M / N / P Gb. But remember N and P can change from rebalance
> to rebalance, and threads does not communicate with each other during their
> life time. So it is hard to determine M and N dynamically.
>
> Plus, different caches may have different cache hit rate, so distributing
> the memory evenly to them may not be an optimal solution (some caches may
> be flushed much more frequently than others), and also since we are
> considering to use instrumentation.getObjectSize which is approximate, it
> may exaggerate the imbalance.
>
>
> Guozhang
>
>
> On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
>
> > Hi Jay,
> >
> > We can make it global instead of per-processor, sounds good.
> >
> > Thanks
> > Eno
> >
> >
> > > On 3 Jun 2016, at 23:15, Jay Kreps <jay@confluent.io> wrote:
> > >
> > > Hey Eno,
> > >
> > > Should the config be the global memory use rather than the
> per-processor?
> > > That is, let’s say I know I have fixed a 1GB heap because that is what
> I
> > > set for Java, and want to use 100MB for caching, it seems like right
> now
> > > I’d have to do some math that depends on my knowing a bit about how
> > caching
> > > works to figure out how to set that parameter so I don't run out of
> > memory.
> > > Does it also depend on the number of partitions assigned (and hence the
> > > number of task), if so that makes it even harder to set since each time
> > > rebalancing happens that changes so it is then pretty hard to set
> safely.
> > >
> > > You could theoretically argue for either bottom up (you know how much
> > cache
> > > you need per processor as you have it and you want to get exactly that)
> > or
> > > top down (you know how much memory you have to spare but can't be
> > bothered
> > > to work out what that amounts to per-processor). I think our experience
> > has
> > > been that 99% of people never change the default and if it runs out of
> > > memory they really struggle to fix it and kind of blame us, so I think
> > top
> > > down and a global config might be better. :-)
> > >
> > > Example: https://issues.apache.org/jira/browse/KAFKA-3775
> > >
> > > -Jay
> > >
> > > On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.thereska@gmail.com>
> > wrote:
> > >
> > >> Hi Gwen,
> > >>
> > >> Yes. As an example, if cache.max.bytes.buffering set to X, and if
> users
> > >> have A aggregation operators and T KTable.to() operators, then X*(A +
> T)
> > >> total bytes will be allocated for caching.
> > >>
> > >> Eno
> > >>
> > >>> On 3 Jun 2016, at 21:37, Gwen Shapira <gwen@confluent.io> wrote:
> > >>>
> > >>> Just to clarify: "cache.max.bytes.buffering" is per processor?
> > >>>
> > >>>
> > >>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <
> eno.thereska@gmail.com>
> > >> wrote:
> > >>>> Hi there,
> > >>>>
> > >>>> I have created KIP-63: Unify store and downstream caching in streams
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
> > >> <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > >>>
> > >>>>
> > >>>>
> > >>>> Feedback is appreciated.
> > >>>>
> > >>>> Thank you
> > >>>> Eno
> > >>
> > >>
> >
> >
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message