From Ken Krugler <kkrugler_li...@transpac.com>
Subject Re: Iterating over keys in state backend
Date Tue, 02 May 2017 00:31:07 GMT
Hi Kostas,

In my use case I’m keeping track of the state of URLs during a web crawl.

This represents both current state (“URL X should be crawled at time Y, and has an estimated
value of Z), and is the source of URLs to be fed into the crawl infrastructure - it’s a
floor wax and a dessert topping.

Which is why it’s a process function, so that I can query this “crawlDB” to get URLs
to emit to the fetch queue, independent of when/if new URLs are flowing in from some external

And yes, I could use an external, queryable system to to handle this (e.g. Elasticsearch),
but at a scale of billions of URLs having something custom is of significant value in terms
of performance and resource costs.

There are things I could do to better leverage Flink’s state management, so I have to do
less in this custom DB (e.g. archiving low-scoring URLs comes to mind).

But after a few whiteboard sessions, it still seems like I’m going to have to add checkpointing/snapshotting
support to my custom crawlDB.


— Ken

> On Apr 28, 2017, at 1:28am, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
> Hi Ken,
> So you have a queue where elements are sorted by timestamp and score, and when the time
(event time I suppose) passes 
> that of the timestamp of an element, you want to fetch the element and:
>  if the score is too low you archive it 
>  if the score is OK you emit it.
> If I get it right, then if your stream is keyed you have a queue and an “archive”
state per key, 
> if not, you have a global queue for all elements, which can be seen as a keyed stream
on a dummy key, right?
> By the way, timers in Flink have to be associated with a key, so I suppose that if you
are using timers you are in the first case (keyed stream).
> In this case, why do you need access to the state of all the keys?
> Also it may be worth having a look at the CEP operator in the Flink codebase.
> There you also have a queue per key, where events are sorted by timestamp, and at each
> elements with timestamps smaller than the watermark are processed.
> Hope this helps,
> Kostas
>> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kkrugler_lists@transpac.com <mailto:kkrugler_lists@transpac.com>>
>> Hi Kostas,
>> Thanks for responding. Details in-line below.
>>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.kloudas@data-artisans.com <mailto:k.kloudas@data-artisans.com>>
>>> Hi Ken,
>>> Unfortunately, iterating over all keys is not currently supported.
>>> Do you have your own custom operator (because you mention “from within the
operator…”) or
>>> you have a process function (because you mention the “onTimer” method)?
>> Currently it’s a process function, but I might be able to just use a regular operator.
>>> Also, could you describe your use case a bit more?  You have a periodic timer
per key and when
>>> a timer for a given key fires you want to have access to the state of all the
>> The timer bit is because I’m filling an async queue, and thus need to trigger emitting
tuples to the operator’s output stream independent of inbound tuples.
>> The main problems I’m trying to solve (without requiring a separate scalable DB
infrastructure) are:
>>  - entries have an associated “earliest processing time”. I don’t want to send
these through the system until that time trigger has passed.
>>  - entries have an associated “score”. I want to favor processing high scoring
entries over low scoring entries.
>>  - if an entry’s score is too low, I want to archive it, versus constantly re-evaluate
it using the above two factors.
>> I’ve got my own custom DB that is working for the above, and scales to target sizes
of 1B+ entries per server by using a mixture of RAM and disk.
>> But having to checkpoint it isn’t trivial.
>> So I thought that if there was a way to (occasionally) iterate over the keys in the
state backend, I could get what I needed with the minimum effort.
>> But sounds like that’s not possible currently.
>> Thanks,
>> — Ken
>>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_lists@transpac.com
<mailto:kkrugler_lists@transpac.com>> wrote:
>>>> Is there a way to iterate over all of the key/value entries in the state
backend, from within the operator that’s making use of the same?
>>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer
method) I need to iterate over all KV state and emit the N “best” entries.
>>>> What’s the recommended approach?
>>>> Thanks,
>>>> — Ken
Ken Krugler
+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

