Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 71064200B30 for ; Mon, 20 Jun 2016 07:18:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6F9EB160A64; Mon, 20 Jun 2016 05:18:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 93479160A53 for ; Mon, 20 Jun 2016 07:18:08 +0200 (CEST) Received: (qmail 79608 invoked by uid 500); 20 Jun 2016 05:18:07 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 79596 invoked by uid 99); 20 Jun 2016 05:18:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 05:18:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E933E1801B0 for ; Mon, 20 Jun 2016 05:18:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.279 X-Spam-Level: * X-Spam-Status: No, score=1.279 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id hw904QEAT10X for ; Mon, 20 Jun 2016 05:18:04 +0000 (UTC) Received: from mail-qk0-f179.google.com (mail-qk0-f179.google.com [209.85.220.179]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id D768E5FB60 for ; Mon, 20 Jun 2016 05:18:03 +0000 (UTC) Received: by mail-qk0-f179.google.com with SMTP id p10so148549601qke.3 for ; Sun, 19 Jun 2016 22:18:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=Uk3Pdpa2LcYdKvxsbdmQ43K3RSzwnK9h/GTV8+p8Gp0=; b=dStEUsh7VJbZHB1sywnz+D7lebbmhqTIQ/k0W3KCxzOVzUp1zaSBqKR8G3XjUhQWpY DMrkAHjFsrnkf2sBVM0V90hqthKavTbnZO9gtWlxpXfttc/YVB2NqZ9YEDOJWAPrfe3/ F5HcEbzzsEzj1Jew4koIsBXrr4ISTEK8TpJKl850Liby4D1vQ8z8MVl5+I4Ddsxj+Kdj Be6d4MOpHblhd4k8POUNPT6agUEv813Btz8P4R3s1+4dc3fDjL4PHBexjH+YCYmICTrc 8wbb5gtD3yvsrl4EjF/0CwNsm+Q9E+CCVvy7KAwm2M8q3nkYzAbSsZrJg2N7FBxC34Ss Ypiw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=Uk3Pdpa2LcYdKvxsbdmQ43K3RSzwnK9h/GTV8+p8Gp0=; b=I3BSerIE7XcwAormZdQsKTOjXmtdcbbxdHfZV3sKCUYas+Ri1nRboNGEuEjONRP8pE Yxek9GXZ5oXD4ivweAdkb6hRfx+cqKUXWa7l35vPqJAnf76E0OK3z6RZ1QlUkT7cwP1g J2HFC5M2jskmUdIAWiBUQtl9bwE+9ucgOvLI3V1CW3cNna+EeTJRXCkP1hiNmaSBBkM6 qosEicXX3NgZLgHj/WAtVXhS9m9GFDawE9wfBSsocQGlDFiKaq1cwFAvduYd+K2JvxrX TXo2AV9oS9o2kgQH9WRSoymukWfbsXfhwmHTHsFQn1lrRjDhQFCB67GfwvqxjB+IbCI4 bDFQ== X-Gm-Message-State: ALyK8tIJInzbRct/MhFOwmOsdCPA0LljcdWD2ympOWyKi9nJzD7tBuKHp18xzUUbIkOxo/SjfAnlzy3wvi0vsg== X-Received: by 10.200.55.115 with SMTP id p48mr18774038qtb.15.1466399876703; Sun, 19 Jun 2016 22:17:56 -0700 (PDT) MIME-Version: 1.0 Received: by 10.237.33.252 with HTTP; Sun, 19 Jun 2016 22:17:16 -0700 (PDT) In-Reply-To: References: <4EAE047E-7583-455E-85B1-B880F94C767C@gmail.com> <03513734-670A-4F72-B324-BAC9858EBC34@gmail.com> <575F30F9.3080701@confluent.io> From: Neha Narkhede Date: Sun, 19 Jun 2016 22:17:16 -0700 Message-ID: Subject: Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a1137b2c0bffdb30535aed369 archived-at: Mon, 20 Jun 2016 05:18:09 -0000 --001a1137b2c0bffdb30535aed369 Content-Type: text/plain; charset=UTF-8 I'm in favor of a global config that is then evenly divided amongst the threads of a Kafka Streams instance. On Mon, Jun 13, 2016 at 6:23 PM, Guozhang Wang wrote: > Although this KIP is not mainly for memory management of Kafka Streams, > since it touches on quite some part of it I think it is good to first think > of what we would REALLY want as an end goal for memory usage in order to > make sure that whatever we proposed in this KIP aligns with that long-term > plan. So I wrote up this discussion page that summarized my current > thoughts: > > > https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams > > As for its implication on this KIP, my personal take is that: > > 1. we should use a global config in terms of bytes, which will then be > evenly divided among the threads within the Kafka Streams instance, but > within a thread that config can be used to control the total size of all > caches instead of further dividing that among all caches. > > 2. instead of caching in terms of deserialized objects we may need to > consider just caching in terms of serialized bytes; admittedly it will > incur costs of doing serdes for caching, but without doing so I honestly > have no concrete clue how we can measure the current memory usage > accurately AND efficiently (after reading the links Ismael sent me I feel > the accurate estimates for collection types / composite types like String > will do some serialize with sun.misc.Unsafe anyways when it uses reflection > to crawl the object graph) although we may need to do some benchmarking > with https://github.com/jbellis/jamm, for example to validate this claim > or > someone tell me that there is actually a better way that I'm not aware of.. > > > Guozhang > > > On Mon, Jun 13, 2016 at 3:17 PM, Matthias J. Sax > wrote: > > > I am just catching up on this thread. > > > > From my point of view, easy tuning for the user is the most important > > thing, because Kafka Streams is a library. Thus, a global cache size > > parameter should be the best. > > > > About dividing the memory vs a single global cache. I would argue that > > in the first place dividing the memory would be good, as synchronization > > might kill the performance. About the cache sizes, I was thinking about > > starting with an even distribution and adjust the individual cache sizes > > during runtime. > > > > The dynamic adjustment can also be added later on. We need to figure out > > a good internal monitoring and "cost function" to determine which task > > needs more memory and which less. Some metrics to do this might be > > number-of-assigned-keys, size-of-key-value-pairs, update-frequency etc. > > > > I have to confess, that I have no idea right now, how to design the > > "cost function" to compute the memory size for each task. But if we want > > to add dynamic memory management later on, it might be a good idea to > > keep it in mind and align this KIP already for future improvements. > > > > -Matthias > > > > > > On 06/09/2016 05:24 AM, Henry Cai wrote: > > > One more thing for this KIP: > > > > > > Currently RocksDBWindowStore serialize the key/value before it puts > into > > > the in-memory cache, I think we should delay this > > > serialization/deserialization unless it needs flush to db. For a > simple > > > countByKey for 100 records, this would trigger 100 > > > serialization/deserialization even if everything is in-memory. > > > > > > If we move this internal cache from RocksDBStore to a global place, I > > hope > > > we can reduces the time it needs to do the serialization. > > > > > > > > > On Mon, Jun 6, 2016 at 11:07 AM, Ismael Juma > wrote: > > > > > >> On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang > > wrote: > > >>> > > >>> About using Instrumentation.getObjectSize, yeah we were worried a lot > > >> about > > >>> its efficiency as well as accuracy when discussing internally, but > not > > a > > >>> better solution was proposed. So if people have better ideas, please > > >> throw > > >>> them here, as it is also the purpose for us to call out such KIP > > >> discussion > > >>> threads. > > >>> > > >> > > >> Note that this requires a Java agent to be configured. A few links: > > >> > > >> > > >> > > > https://github.com/apache/spark/blob/b0ce0d13127431fa7cd4c11064762eb0b12e3436/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala > > >> > > >> > > > https://github.com/apache/cassandra/blob/3dcbe90e02440e6ee534f643c7603d50ca08482b/src/java/org/apache/cassandra/utils/ObjectSizes.java > > >> https://github.com/jbellis/jamm > > >> http://openjdk.java.net/projects/code-tools/jol/ > > >> https://github.com/DimitrisAndreou/memory-measurer > > >> > > >> OK, maybe that's more than what you wanted. :) > > >> > > >> Ismael > > >> > > > > > > > > > > -- > -- Guozhang > -- Thanks, Neha --001a1137b2c0bffdb30535aed369--