flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine MARZOUGUI <y.marzou...@mindlytix.com>
Subject Re: Appropriate State to use to buffer events in ProcessFunction
Date Thu, 16 Mar 2017 23:48:33 GMT
Hi Xiaogang,

Indeed, the MapState is what I was looking for in order to have efficient
sorted state, as it would faciliate many use cases like this one, or
joining streams, etc. I searched a bit and found your contribution
<https://github.com/apache/flink/pull/3336> of MapState for the next 1.3
release, I'll see how it works for me.
Thank you for pointing this out, very helpful!


2017-03-16 18:50 GMT+01:00 SHI Xiaogang <shixiaogangg@gmail.com>:

> Hi Yassine,
> If I understand correctly, you are needing sorted states which
> unfortunately are not supported in Flink now.
> We have some ideas to provide such sorted states to facilitate the
> development of user applications. But it is still under discussion due to
> the concerns on back compatibility.
> Currently, I think we can work around the problem with MapStates in
> RocksDB statebackends.
> In RocksDB statebackends, each entry in MapState corresponds to an entry
> in RocksDB. The key of a RocksDB entry is formatted as "
> keyGroup#key#keyLen#namespace#namespaceLen#mapKey"
> The entries in RocksDB are sorted in the lexicographical order. In the
> cases where the map keys are typed Timestamp/Long, the entries in the
> MapState will be iterated as the same order in a sorted map. Thus, you can
> find all the events whose timestamps are smaller than the given one.
> The solution is quite tricky because it does not work when Heap
> statebackends are used. But given that the state may grow up to ~100GB,
> RocksDB statebackends are strongly recommended.
> May the information helps you.
> Regards,
> Xiaogang
> 2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
>> Hi Timo,
>> I thought about the ListState but quickly discarded It as it keeps the
>> insersion order and not events order. After a second thought I think I will
>> reconsider it since my events are occaionally out-of-order. Didn't know
>> that Flink CEP operators 'next' and 'within', can handle event time, so I
>> think I will give it a try! Thank you!
>> Best,
>> Yassine
>> 2017-03-08 9:55 GMT+01:00 Timo Walther <twalthr@apache.org>:
>>> Hi Yassine,
>>> have you thought about using a ListState? As far as I know, it keeps at
>>> least the insertion order. You could sort it once your trigger event has
>>> arrived.
>>> If you use a RocksDB as state backend, 100+ GB of state should not be a
>>> problem. Have you thought about using Flink's CEP library? It might fit to
>>> your needs without implementing a custom process function.
>>> I hope that helps.
>>> Timo
>>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>>> Hi all,
>>>> I want to label events in a stream based on a condition on some future
>>>> events.
>>>> For example my stream contains events of type A and B and and I would
>>>> like to assign a label 1 to an event E of type A if an event of type B
>>>> happens within a duration x of E. I am using event time and my events can
>>>> be out of order.
>>>> For this I'm using ProcessFunction which looks suitable for my use
>>>> case. In order to handle out of order events, I'm keeping events of type
>>>> in a state and once an event of type B is received, I fire an event time
>>>> timer in which I loop through events of type A in the state having a
>>>> timestamps < timer.timestamp, label them and remove them from the state.
>>>> Currently the state is simply a value state containing a
>>>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to
>>>> effectively get events older than the timer timestamp.
>>>> I wonder If that's the appropriate data structure to use in the value
>>>> state to buffer events and be able to handle out of orderness, or if there
>>>> is a more effective implementation, especially that the state may grow to
>>>> reach ~100 GB sometimes?
>>>> Any insight is appreciated.
>>>> Thanks,
>>>> Yassine

View raw message