kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <n...@confluent.io>
Subject Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams
Date Mon, 20 Jun 2016 05:17:16 GMT
I'm in favor of a global config that is then evenly divided amongst the
threads of a Kafka Streams instance.

On Mon, Jun 13, 2016 at 6:23 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Although this KIP is not mainly for memory management of Kafka Streams,
> since it touches on quite some part of it I think it is good to first think
> of what we would REALLY want as an end goal for memory usage in order to
> make sure that whatever we proposed in this KIP aligns with that long-term
> plan. So I wrote up this discussion page that summarized my current
> thoughts:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams
>
> As for its implication on this KIP, my personal take is that:
>
> 1. we should use a global config in terms of bytes, which will then be
> evenly divided among the threads within the Kafka Streams instance, but
> within a thread that config can be used to control the total size of all
> caches instead of further dividing that among all caches.
>
> 2. instead of caching in terms of deserialized objects we may need to
> consider just caching in terms of serialized bytes; admittedly it will
> incur costs of doing serdes for caching, but without doing so I honestly
> have no concrete clue how we can measure the current memory usage
> accurately AND efficiently (after reading the links Ismael sent me I feel
> the accurate estimates for collection types / composite types like String
> will do some serialize with sun.misc.Unsafe anyways when it uses reflection
> to crawl the object graph) although we may need to do some benchmarking
> with https://github.com/jbellis/jamm, for example to validate this claim
> or
> someone tell me that there is actually a better way that I'm not aware of..
>
>
> Guozhang
>
>
> On Mon, Jun 13, 2016 at 3:17 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > I am just catching up on this thread.
> >
> > From my point of view, easy tuning for the user is the most important
> > thing, because Kafka Streams is a library. Thus, a global cache size
> > parameter should be the best.
> >
> > About dividing the memory vs a single global cache. I would argue that
> > in the first place dividing the memory would be good, as synchronization
> > might kill the performance. About the cache sizes, I was thinking about
> > starting with an even distribution and adjust the individual cache sizes
> > during runtime.
> >
> > The dynamic adjustment can also be added later on. We need to figure out
> > a good internal monitoring and "cost function" to determine which task
> > needs more memory and which less. Some metrics to do this might be
> > number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc.
> >
> > I have to confess, that I have no idea right now, how to design the
> > "cost function" to compute the memory size for each task. But if we want
> > to add dynamic memory management later on, it might be a good idea to
> > keep it in mind and align this KIP already for future improvements.
> >
> > -Matthias
> >
> >
> > On 06/09/2016 05:24 AM, Henry Cai wrote:
> > > One more thing for this KIP:
> > >
> > > Currently RocksDBWindowStore serialize the key/value before it puts
> into
> > > the in-memory cache, I think we should delay this
> > > serialization/deserialization unless it needs flush to db.  For a
> simple
> > > countByKey for 100 records, this would trigger 100
> > > serialization/deserialization even if everything is in-memory.
> > >
> > > If we move this internal cache from RocksDBStore to a global place, I
> > hope
> > > we can reduces the time it needs to do the serialization.
> > >
> > >
> > > On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma <ismael@juma.me.uk>
> wrote:
> > >
> > >> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >>>
> > >>> About using Instrumentation.getObjectSize, yeah we were worried a lot
> > >> about
> > >>> its efficiency as well as accuracy when discussing internally, but
> not
> > a
> > >>> better solution was proposed. So if people have better ideas, please
> > >> throw
> > >>> them here, as it is also the purpose for us to call out such KIP
> > >> discussion
> > >>> threads.
> > >>>
> > >>
> > >> Note that this requires a Java agent to be configured. A few links:
> > >>
> > >>
> > >>
> >
> https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
> > >>
> > >>
> >
> https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java
> > >> https://github.com/jbellis/jamm
> > >> http://openjdk.java.net/projects/code-tools/jol/
> > >> https://github.com/DimitrisAndreou/memory-measurer
> > >>
> > >> OK, maybe that's more than what you wanted. :)
> > >>
> > >> Ismael
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>



-- 
Thanks,
Neha

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message