kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Tue, 14 Jun 2016 01:23:01 GMT
Although this KIP is not mainly for memory management of Kafka Streams,
since it touches on quite some part of it I think it is good to first think
of what we would REALLY want as an end goal for memory usage in order to
make sure that whatever we proposed in this KIP aligns with that long-term
plan. So I wrote up this discussion page that summarized my current
thoughts:

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams

As for its implication on this KIP, my personal take is that:

1. we should use a global config in terms of bytes, which will then be
evenly divided among the threads within the Kafka Streams instance, but
within a thread that config can be used to control the total size of all
caches instead of further dividing that among all caches.

2. instead of caching in terms of deserialized objects we may need to
consider just caching in terms of serialized bytes; admittedly it will
incur costs of doing serdes for caching, but without doing so I honestly
have no concrete clue how we can measure the current memory usage
accurately AND efficiently (after reading the links Ismael sent me I feel
the accurate estimates for collection types / composite types like String
will do some serialize with sun.misc.Unsafe anyways when it uses reflection
to crawl the object graph) although we may need to do some benchmarking
with https://github.com/jbellis/jamm, for example to validate this claim or
someone tell me that there is actually a better way that I'm not aware of..


Guozhang


On Mon, Jun 13, 2016 at 3:17 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> I am just catching up on this thread.
>
> From my point of view, easy tuning for the user is the most important
> thing, because Kafka Streams is a library. Thus, a global cache size
> parameter should be the best.
>
> About dividing the memory vs a single global cache. I would argue that
> in the first place dividing the memory would be good, as synchronization
> might kill the performance. About the cache sizes, I was thinking about
> starting with an even distribution and adjust the individual cache sizes
> during runtime.
>
> The dynamic adjustment can also be added later on. We need to figure out
> a good internal monitoring and "cost function" to determine which task
> needs more memory and which less. Some metrics to do this might be
> number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc.
>
> I have to confess, that I have no idea right now, how to design the
> "cost function" to compute the memory size for each task. But if we want
> to add dynamic memory management later on, it might be a good idea to
> keep it in mind and align this KIP already for future improvements.
>
> -Matthias
>
>
> On 06/09/2016 05:24 AM, Henry Cai wrote:
> > One more thing for this KIP:
> >
> > Currently RocksDBWindowStore serialize the key/value before it puts into
> > the in-memory cache, I think we should delay this
> > serialization/deserialization unless it needs flush to db.  For a simple
> > countByKey for 100 records, this would trigger 100
> > serialization/deserialization even if everything is in-memory.
> >
> > If we move this internal cache from RocksDBStore to a global place, I
> hope
> > we can reduces the time it needs to do the serialization.
> >
> >
> > On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma <ismael@juma.me.uk> wrote:
> >
> >> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >>>
> >>> About using Instrumentation.getObjectSize, yeah we were worried a lot
> >> about
> >>> its efficiency as well as accuracy when discussing internally, but not
> a
> >>> better solution was proposed. So if people have better ideas, please
> >> throw
> >>> them here, as it is also the purpose for us to call out such KIP
> >> discussion
> >>> threads.
> >>>
> >>
> >> Note that this requires a Java agent to be configured. A few links:
> >>
> >>
> >>
> https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
> >>
> >>
> https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java
> >> https://github.com/jbellis/jamm
> >> http://openjdk.java.net/projects/code-tools/jol/
> >> https://github.com/DimitrisAndreou/memory-measurer
> >>
> >> OK, maybe that's more than what you wanted. :)
> >>
> >> Ismael
> >>
> >
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message