kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Mon, 06 Jun 2016 09:59:59 GMT
Hi Guozhang,

About your first point: the alternative is not knowing how much memory a KafkaStreams instance
will consume, since, as you mention, M and N can change. I agree the implementation is slightly
harder since each cache now can change size dynamically (and the Kafka Streams instance needs
to coordinate that).

About the different cache hit rates argument, I agree, a more sophisticated solution would
provide variable-sized caches. But precisely this argument leads to a global configuration
parameter being better in my opinion, since the Kafka Streams instance would get a total memory
budged and do what's best with it. Note I am not suggesting we attempt this for the V1 implementation,
just pointing out that it is possible to do variable-size caches with the global config.

So overall we have a tradeoff between a more complex implementation but a guarantee on total
memory usage (global setting), and a simple implementation but with variable memory usage
(per-cache setting).

Eno

> On 5 Jun 2016, at 20:44, Guozhang Wang <wangguoz@gmail.com> wrote:
> 
> There are some details needed to be figured out if we go global:
> 
> A KafkaStreams instance could have M threads, and each thread could various
> number (let's say N, but in practice it may be different from thread to
> thread) tasks, and each task contains a sub-topology with P caches (again
> in practice it may be different depending on which sub-topology it
> contains).
> 
> Say if user specified X Gb for this KafkaStreams instance, then each cache
> will get X / M / N / P Gb. But remember N and P can change from rebalance
> to rebalance, and threads does not communicate with each other during their
> life time. So it is hard to determine M and N dynamically.
> 
> Plus, different caches may have different cache hit rate, so distributing
> the memory evenly to them may not be an optimal solution (some caches may
> be flushed much more frequently than others), and also since we are
> considering to use instrumentation.getObjectSize which is approximate, it
> may exaggerate the imbalance.
> 
> 
> Guozhang
> 
> 
> On Sat, Jun 4, 2016 at 11:54 PM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
> 
>> Hi Jay,
>> 
>> We can make it global instead of per-processor, sounds good.
>> 
>> Thanks
>> Eno
>> 
>> 
>>> On 3 Jun 2016, at 23:15, Jay Kreps <jay@confluent.io> wrote:
>>> 
>>> Hey Eno,
>>> 
>>> Should the config be the global memory use rather than the per-processor?
>>> That is, let’s say I know I have fixed a 1GB heap because that is what I
>>> set for Java, and want to use 100MB for caching, it seems like right now
>>> I’d have to do some math that depends on my knowing a bit about how
>> caching
>>> works to figure out how to set that parameter so I don't run out of
>> memory.
>>> Does it also depend on the number of partitions assigned (and hence the
>>> number of task), if so that makes it even harder to set since each time
>>> rebalancing happens that changes so it is then pretty hard to set safely.
>>> 
>>> You could theoretically argue for either bottom up (you know how much
>> cache
>>> you need per processor as you have it and you want to get exactly that)
>> or
>>> top down (you know how much memory you have to spare but can't be
>> bothered
>>> to work out what that amounts to per-processor). I think our experience
>> has
>>> been that 99% of people never change the default and if it runs out of
>>> memory they really struggle to fix it and kind of blame us, so I think
>> top
>>> down and a global config might be better. :-)
>>> 
>>> Example: https://issues.apache.org/jira/browse/KAFKA-3775
>>> 
>>> -Jay
>>> 
>>> On Fri, Jun 3, 2016 at 2:39 PM, Eno Thereska <eno.thereska@gmail.com>
>> wrote:
>>> 
>>>> Hi Gwen,
>>>> 
>>>> Yes. As an example, if cache.max.bytes.buffering set to X, and if users
>>>> have A aggregation operators and T KTable.to() operators, then X*(A + T)
>>>> total bytes will be allocated for caching.
>>>> 
>>>> Eno
>>>> 
>>>>> On 3 Jun 2016, at 21:37, Gwen Shapira <gwen@confluent.io> wrote:
>>>>> 
>>>>> Just to clarify: "cache.max.bytes.buffering" is per processor?
>>>>> 
>>>>> 
>>>>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.thereska@gmail.com>
>>>> wrote:
>>>>>> Hi there,
>>>>>> 
>>>>>> I have created KIP-63: Unify store and downstream caching in streams
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
>>>> <
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Feedback is appreciated.
>>>>>> 
>>>>>> Thank you
>>>>>> Eno
>>>> 
>>>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Mime
View raw message