kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <g...@confluent.io>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Sun, 05 Jun 2016 20:53:33 GMT
So you were listing the difficulties in order to provide context for
the upcoming design change and discussion?
Its all good then :)

On Sun, Jun 5, 2016 at 11:26 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
> Don't get me wrong Gwen :) I'm definitely for removing as less burden as
> possible from users. All I'm saying it is not straight-forward to do so,
> and we'd better at least have a concrete implementation design on the KIP
> page rather than just a one-line change of the config semantics.
>
> On Sun, Jun 5, 2016 at 1:09 PM, Gwen Shapira <gwen@confluent.io> wrote:
>
>> If it is hard for you, just imagine how much fun the users will have.
>>
>> Machines have X GB available RAM. Someone has to figure out how to
>> divide it for processors and rocksDB. The user doesn't have any
>> special knowledge that you don't in this case, so there's no point in
>> pushing the decision to the user - he won't be able to make a better
>> decision.
>>
>> Gwen
>>
>> On Sun, Jun 5, 2016 at 10:44 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>> > There are some details needed to be figured out if we go global:
>> >
>> > A KafkaStreams instance could have M threads, and each thread could
>> various
>> > number (let's say N, but in practice it may be different from thread to
>> > thread) tasks, and each task contains a sub-topology with P caches (again
>> > in practice it may be different depending on which sub-topology it
>> > contains).
>> >
>> > Say if user specified X Gb for this KafkaStreams instance, then each
>> cache
>> > will get X / M / N / P Gb. But remember N and P can change from rebalance
>> > to rebalance, and threads does not communicate with each other during
>> their
>> > life time. So it is hard to determine M and N dynamically.
>> >
>> > Plus, different caches may have different cache hit rate, so distributing
>> > the memory evenly to them may not be an optimal solution (some caches may
>> > be flushed much more frequently than others), and also since we are
>> > considering to use instrumentation.getObjectSize which is approximate, it
>> > may exaggerate the imbalance.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.thereska@gmail.com>
>> > wrote:
>> >
>> >> Hi Jay,
>> >>
>> >> We can make it global instead of per-processor, sounds good.
>> >>
>> >> Thanks
>> >> Eno
>> >>
>> >>
>> >> > On 3 Jun 2016, at 23:15, Jay Kreps <jay@confluent.io> wrote:
>> >> >
>> >> > Hey Eno,
>> >> >
>> >> > Should the config be the global memory use rather than the
>> per-processor?
>> >> > That is, let’s say I know I have fixed a 1GB heap because that is
>> what I
>> >> > set for Java, and want to use 100MB for caching, it seems like right
>> now
>> >> > I’d have to do some math that depends on my knowing a bit about how
>> >> caching
>> >> > works to figure out how to set that parameter so I don't run out of
>> >> memory.
>> >> > Does it also depend on the number of partitions assigned (and hence
>> the
>> >> > number of task), if so that makes it even harder to set since each
>> time
>> >> > rebalancing happens that changes so it is then pretty hard to set
>> safely.
>> >> >
>> >> > You could theoretically argue for either bottom up (you know how much
>> >> cache
>> >> > you need per processor as you have it and you want to get exactly
>> that)
>> >> or
>> >> > top down (you know how much memory you have to spare but can't be
>> >> bothered
>> >> > to work out what that amounts to per-processor). I think our
>> experience
>> >> has
>> >> > been that 99% of people never change the default and if it runs out
of
>> >> > memory they really struggle to fix it and kind of blame us, so I think
>> >> top
>> >> > down and a global config might be better. :-)
>> >> >
>> >> > Example: https://issues.apache.org/jira/browse/KAFKA-3775
>> >> >
>> >> > -Jay
>> >> >
>> >> > On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.thereska@gmail.com>
>> >> wrote:
>> >> >
>> >> >> Hi Gwen,
>> >> >>
>> >> >> Yes. As an example, if cache.max.bytes.buffering set to X, and
if
>> users
>> >> >> have A aggregation operators and T KTable.to() operators, then
X*(A
>> + T)
>> >> >> total bytes will be allocated for caching.
>> >> >>
>> >> >> Eno
>> >> >>
>> >> >>> On 3 Jun 2016, at 21:37, Gwen Shapira <gwen@confluent.io>
wrote:
>> >> >>>
>> >> >>> Just to clarify: "cache.max.bytes.buffering" is per processor?
>> >> >>>
>> >> >>>
>> >> >>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <
>> eno.thereska@gmail.com>
>> >> >> wrote:
>> >> >>>> Hi there,
>> >> >>>>
>> >> >>>> I have created KIP-63: Unify store and downstream caching
in
>> streams
>> >> >>>>
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
>> >> >> <
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
>> >> >>>
>> >> >>>>
>> >> >>>>
>> >> >>>> Feedback is appreciated.
>> >> >>>>
>> >> >>>> Thank you
>> >> >>>> Eno
>> >> >>
>> >> >>
>> >>
>> >>
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Mime
View raw message