Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ADBF5200D5B for ; Wed, 13 Dec 2017 14:22:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AC491160C23; Wed, 13 Dec 2017 13:22:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EEC3F160C16 for ; Wed, 13 Dec 2017 14:22:43 +0100 (CET) Received: (qmail 38358 invoked by uid 500); 13 Dec 2017 13:22:43 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 38343 invoked by uid 99); 13 Dec 2017 13:22:42 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Dec 2017 13:22:42 +0000 Received: from aljoschas-mbp.fritz.box (ip-2-205-80-95.web.vodafone.de [2.205.80.95]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1CEF51A0048; Wed, 13 Dec 2017 13:22:40 +0000 (UTC) From: Aljoscha Krettek Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_4BF19590-3BB7-40B2-9EF4-D8A7E56EBCBC" Mime-Version: 1.0 (Mac OS X Mail 11.1 \(3445.4.7\)) Subject: Re: Storing large lists into state per key Date: Wed, 13 Dec 2017 14:22:35 +0100 In-Reply-To: Cc: Stephan Ewen To: dev@flink.apache.org References: <515e4517-4a48-40b3-ce3f-28bef2106385@seznam.cz> X-Mailer: Apple Mail (2.3445.4.7) archived-at: Wed, 13 Dec 2017 13:22:44 -0000 --Apple-Mail=_4BF19590-3BB7-40B2-9EF4-D8A7E56EBCBC Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, If I remember correctly, there was actually an effort to change the = RocksDB list state the way you described. I'm cc'ing Stephan, who was = involved in that and this is the Jira issue: = https://issues.apache.org/jira/browse/FLINK-5756 = Best, Aljoscha > On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU = wrote: >=20 > Hi Jan, >=20 > You could associate a key to each element of your Key's list (e.g., = hashing the value), keep only the keys in heap (e.g., in a list) and the = associated state key-value/s in an external store like RocksDB/Redis, = but you will notice large overheads due to de/serializing - a huge = penatly for more than 1000s of elements (see = https://hal.inria.fr/hal-01530744/document = for some experimental = settings) for relatively small rate of new events per Key, if needed to = process all values of a Key for each new event. Best case you can do = some incremental processing unless your non-combining means = non-associative operations per Key. >=20 > Best, > Ovidiu >> On 12 Dec 2017, at 11:54, Jan Lukavsk=C3=BD wrote: >>=20 >> Hi Fabian, >>=20 >> thanks for quick reply, what you suggest seems to work at first = sight, I will try it. Is there any reason not to implement a = RocksDBListState this way in general? Is there any increased overhead of = this approach? >>=20 >> Thanks, >>=20 >> Jan >>=20 >>=20 >> On 12/12/2017 11:17 AM, Fabian Hueske wrote: >>> Hi Jan, >>>=20 >>> I cannot comment on the internal design, but you could put the data = into a >>> RocksDBStateBackend MapState where the value X is your = data >>> type and the key is the list index. You would need another = ValueState for >>> the current number of elements that you put into the MapState. >>> A MapState allows to fetch and traverse the key, value, or entry set = of the >>> Map without loading it completely into memory. >>> The sets are traversed in sort order of the key, so should be in = insertion >>> order (given that you properly increment the list index). >>>=20 >>> Best, Fabian >>>=20 >>> 2017-12-12 10:23 GMT+01:00 Jan Lukavsk=C3=BD : >>>=20 >>>> Hi all, >>>>=20 >>>> I have a question that appears as a user@ question, but brought me = into >>>> the dev@ mailing list while I was browsing through the Flink's = source >>>> codes. First I'll try to briefly describe my use case. I'm trying = to do a >>>> group-by-key operation with a limited number of distinct keys = (which I >>>> cannot control), but a non trivial count of values. The operation = in the >>>> GBK is non-combining, so that all values per key (many) have to be = stored >>>> in a state. Running this on testing data led to a surprise (for = me), that >>>> even when using RocksDBStateBackend, the whole list of data is = serialized >>>> into single binary blob and then deserialized into List, and = therefore has >>>> to fit in memory (multiple times, in fact). >>>>=20 >>>> I tried to create an alternative RocksDBStateBackend, that would = store >>>> each element of list in ListState to a separate key in RocksDB, so = that the >>>> whole blob would not have to be loaded by a single get, but a scan = over >>>> multiple keys could be made. Digging into the source code I found = there was >>>> a hierarchy of classes mirroring the public API in 'internal' = package - >>>> InternalKvState, InternalMergingState, InternalListState, and so = on. These >>>> classes however have different hierarchy than the public API = classes that >>>> they mirror, most notably InternalKvState is superinterface of all = others. >>>> This fact seems to be used on multiple places throughout the source = code. >>>>=20 >>>> My question is - is this intentional? Would it be possible to store = each >>>> element of a ListState in a separate key in RocksDB (probably by = adding >>>> some suffix to the actual key of the state for each element)? What = are the >>>> pitfalls? And is it necessary for the InternalListState to be = actually >>>> subinterface of InternalKvState? I find this to be a related = problem. >>>>=20 >>>> Many thanks for any comments or thoughts, >>>>=20 >>>> Jan >>>>=20 >>>>=20 >>=20 >=20 --Apple-Mail=_4BF19590-3BB7-40B2-9EF4-D8A7E56EBCBC--