flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: load + update global state
Date Tue, 08 Aug 2017 05:10:35 GMT
Hi Peter!

One thing I’d like to understand first after reading about your use case:
Why exactly do you need the lookup table to be globally accessible? From what I understand,
you are using this lookup table for stream event enriching, so whatever processing you need
to perform downstream on this enriched stream, you would already have the corresponding information
for each session attached.

Regarding a solution for efficient stream enriching in your case:
In your case, the enrichment data comes from the input events itself, so it can be fairly
straightforward: use a MapFunction that keeps the lookup table as managed keyed state [1].
By using RocksDB as your state backend [2], the table would not be backed by memory and therefore
your state size is only bounded by disk size. Each state access would be bound to the current
processed key (i.e., in your case session id, meaning that you’d only be accessing the emails
set of that session).
Using RocksDB as your state backend, each state access and update would require de-/serialization
(of the state of a single key), but that would always be local access and in general would
outperform remotely looking up an external store.

So, to wrap this up, the answers to your doubts, when using Flink, would be:

(1) load the state as a whole from the data store into memory is a huge burn of memory (also
making changes cluster-wide visible is an issue) 
Apart from the “cluster-wide visibility” aspect which needs to be clarified, you can use
RocksDB as the state backend to back the state and not keep the state in memory.

(2) not loading into memory but using something like cassandra / redis as a lookup store would
certainly work but introduces a lot of network requests (possible ideas: use a distributed
cache? broadcast updates in flink cluster?) 
Remote lookup is not required, if you keep the lookup store as managed keyed state in Flink.
All session lookup would be local state access. You can think of it as you’re basically
setting up a K-V store within Flink that is always co-partitioned by session id with your
incoming events.

(3) how should I integrate the changes to the table with flink's checkpointing? 
Simply by registering managed keyed state. Flink will handle checkpointing that for fault
tolerance for you, and ensuring exactly-once. The “Working with State" docs hopefully should
cover that quite well!


Hope this helps :)

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-keyed-state
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/state_backends.html#the-rocksdbstatebackend


On 8 August 2017 at 3:00:57 AM, Peter Ertl (peter.ertl@gmx.net) wrote:

Hi folks,  

I am coding a streaming task that processes http requests from our web site and enriches these
with additional information.  

It contains session ids from historic requests and the related emails that were used within
these session in the past.  


lookup - hashtable: session_id: String => emails: Set[String]  


During processing of these NEW http request  

- the lookup table should be used to get previous emails and enrich the current stream item
 
- new candidates for the lookup table will be discovered during processing of these items
and should be added to the lookup table (also these changes should be visible through the
cluster)  

I see at least the following issues:  

(1) load the state as a whole from the data store into memory is a huge burn of memory (also
making changes cluster-wide visible is an issue)  

(2) not loading into memory but using something like cassandra / redis as a lookup store would
certainly work but introduces a lot of network requests (possible ideas: use a distributed
cache? broadcast updates in flink cluster?)  

(3) how should I integrate the changes to the table with flink's checkpointing?  

I really don't get how to solve this best and my current solution is far from elegant....
 

So is there any best practice for supporting "large lookup tables that change during stream
processing" ?  

Cheers  
Peter  





Mime
View raw message