flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Hermann <m...@gaborhermann.com>
Subject Using QueryableState inside Flink jobs (and Parameter Server implementation)
Date Fri, 10 Feb 2017 15:00:46 GMT
Hi all,

TL;DR: Is it worth to implement a special QueryableState for querying 
state from another part of a Flink streaming job and aligning it with 
fault tolerance?

I've been thinking about implementing a Parameter Server with/within 
Flink. A Parameter Server is basically a specialized key-value store 
optimized for training distributed machine learning models. So not only 
the training data, but also the model is distributed. Range queries are 
typical, and usually vectors and matrices are stored as values.

More generally, an integrated key-value store might also be useful in 
the Streaming API. Although external key-value stores can be used inside 
UDFs for the same purpose, aligning them with the fault tolerance 
mechanism of Flink could be hard. What if state distributed by a key (in 
the current Streaming API) could be queried from another operator? Much 
like QueryableState, but querying *inside* the Flink job. We could make 
use of the fact that state has been queried from inside to optimize 
communication and integrate fault tolerance.

The question is whether the Flink community would like such feature, and 
if so how to do it?

I could elaborate my ideas if needed, and I'm happy to create a design 
doc, but before that, I'd like to know what you all think about this. 
Also, I don't know if I'm missing something, so please correct me. Here 
are some quick notes regarding the integrated KV-store:

- It could allow easier implementation of more complicated use-cases.
E.g. updating users preferences simultaneously based on each others 
preferences when events happen between them such as making a connection, 
liking each other posts, or going to the same concert. User preferences 
are distributed as a state, an event about user A liking user B gets 
sent to A's state and queries the state of B, then updates the state of 
B. There have been questions on the user mailing list for similar 
problems [1].
- Integration with fault tolerance. User does not have to maintain two 
systems consistently.
- Optimization potentials. At the social network example maybe other 
users on the same partitions with user A need the state of user B, so we 
don't have to send around user B twice.
- Could specialize to a Parameter Server for simple (and efficient) 
implementation of (possibly online) machine learning. E.g. sparse 
logistic regression, LDA, matrix factorization for recommendation systems.

- Lot of development effort.
- "Client-server" architecture goes against the DAG dataflow model.

Two approaches for the implementation in the streaming API:

1) An easy way to implement this is to use iterations (or the proposed 
loops API). We can achieve two-way communication by two operators in a 
loop: a worker (W) and a Parameter Server (PS), see the diagram [2]. (An 
additional nested loop in the PS could add replication opportunities). 
Then we would get fault tolerance "for free" by the work of Paris [3]. 
It would also be on top of the Streaming API, with no effect on the runtime.

2) A problem with the loop approach is that coordination between PS 
nodes and worker nodes can only be done on the data stream. We could not 
really use e.g. Akka for async coordination. A harder but more flexible 
way is to use lower-level interfaces of Flink and touch the runtime. 
Then we would have to take care of fault tolerance too.

(As a side note: in the batch API generalizing delta iterations could be 
a solution for Parameter Server [4].)

Thanks for any feedback :)


[2] https://i.imgur.com/GsliUIh.png
[3] https://github.com/apache/flink/pull/1668

View raw message