flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)
Date Tue, 14 Feb 2017 13:43:19 GMT
Hey Gabor,

great ideas here. It's only slightly related, but I'm currently working on a proposal to improve
the queryable state APIs for lookups (partly along the lines of what you suggested with higher
level accessors). Maybe you are interested in contributing there?

I really like your ideas for the use cases you describe, but I'm unsure about the write path
(setKvState), because of the discussed implications to the state backends. I think that this
needs more discussion and coordination with the contributors working on the backends. For
example, one assumption so far was that only a single thread updates state and we don't scope
state per checkpoint (to provide "isolation levels" for the queries; read comitted vs. read
uncommitted) and probably more. 

Because of this I would actually lean towards the iteration approach in a first version. Would
that be a feasible starting point for you?

– Ufuk

On 14 February 2017 at 14:01:21, Gábor Hermann (mail@gaborhermann.com) wrote:
> Hi Gyula, Jinkui Shi,
> Thanks for your thoughts!
> @Gyula: I'll try and explain a bit more detail.
> The API could be almost like the QueryableState's. It could be
> higher-level though: returning Java objects instead of serialized data
> (because there would not be issues with class loading). Also, it could
> support setKvState (see my 5. point). This could lead to both a
> potential performance improvements and easier usage (see my points 2.
> and 3.).
> A use-case could be anything where we'd use an external KV-store.
> For instance we are updating user states based on another user state, so
> in the map function we do a query (in pseudo-ish Scala code):
> users.keyBy(0).flatMapWithState { (userEvent, collector) =>
> val thisUser: UserState = state.get()
> val otherUser: Future[UserState] =
> qsClient.getKvState("users", userEvent.otherUserId)
> otherUser.onSuccess { case otherUserState =>
> state.update(someFunc1(thisUser, otherUserState))
> collector.collect(someFunc2(thisUser, otherUserState))
> }
> }
> Another example could be (online) distributed matrix factorization,
> where the two factor matrices are represented by distributed states. One
> is updated by querying the other (with getKvState), and computing some
> function (i.e. SGD), while the other is updated at the same place (with
> setKvState).
> I see the motivation behind the QueryableState as a way to make further
> use of the KV-store we practically have at stateful operators (but
> please correct me if I'm wrong). I think we could make even more use of
> if the KV-store is used inside the same job.
> 1) Order and timeliness
> As you've mentioned it's hard to guarantee any ordering when working
> with two states on possibly distinct machines. This could bring us to
> distributed transaction processing, what's a complex topic in itself. I
> can imagine using watermarks and keeping different state versions to
> only allow querying state from the past, and not from the future. For
> now, let's just assume that order does not matter.
> 2) Fault-tolerance
> There might be other things we could do, but there are two simple
> guarantees that we can surely provide. First, by using the current
> QueryableState the task could fail with incomplete futures. If the
> records producing those futures are received before the previous
> checkpoint barrier, those updates will be completely lost. We could
> solve this by wait for the futures to complete before starting a
> checkpoint, thus providing exactly-once guarantees. This would ensure
> that, although the UDF has side-effects, every record has its effect
> exactly-once. I don't see a good way to provide this guarantee with the
> current QueryableState. Second, we can guarantee that the query will
> eventually succeed (or else the whole topology would fail).
> 3) Optimizations
> I've also got two examples for optimizations. First, we can do a
> best-effort to co-locate the two stateful operators to save on network
> overhead. The user could try to co-locate the querying machines when
> externally querying the state, but could not move the machines with the
> state being queried. Second, we could provide a user-interface for
> (loose) guarantees on the latency of sending and returning queries, just
> like setting the buffer timeout.
> 4) Concurrent reading/writing
> Key-value states and collectors might be accessed concurrently. While
> the user could use locks, we the system handle this instead of the user.
> E.g. using a thread-safe collector whenever we see internal KV-state
> query registered at the UDF.
> 5) setKvState
> We could not give exactly-once guarantees if we allowed external queries
> to modify the state. When a Flink topology fails and restarts the
> modifications coming from the outside would not be replayed. However, we
> can simply give exactly-once guarantees if the modifications are done
> inside (set aside ordering), as the records would be replayed if the
> modification failed.
> I believe it would not take much effort to do these improvements.
> Although, it would affect the runtime (integration with FT), and it
> might not be worth working towards these goals. What do you think about
> this?
> It's also worth considering that the same use-cases could be similarly
> done with the iteration/loops API, but in a bit less natural way,
> imitating two-direction communication.
> Should we move this discussion to a JIRA issue, to avoid flooding the
> mailing list?
> @Jinkui Shi:
> 1. I think we should definitely support a flexible update strategy. I.e.
> allow to choose between sync, async and bounded-delay.
> 2. I really like your idea of using PS externally and connecting to it
> with a source and a sink. Fault-tolerance could be also be achieved by
> versioning at the PS and resending older parameters if the Flink job
> fails (i.e. making PS a durable source). Although, the question is then
> how to implement the PS? Do you think we could use the implementations
> you've mentioned?
> 3. Good idea. It's been just proposed to support GPU calculations in
> Flink [1]. Fortunately, that seems orthogonal to our problem: if there's
> a PS, we can later include GPU calculations.
> My main question remains: whether there's a need for an integrated PS or
> not. It would not fit into the current project structure (neither in ML
> nor in Streaming), so I guess the only direction is to build on top of
> the streaming API and use an external service just as you've proposed.
> [1] https://issues.apache.org/jira/browse/FLINK-5782
> Cheers,
> Gabor
> On 2017-02-13 04:10, Jinkui Shi wrote:
> > hi,Gábor Hermann
> >
> > The online parameter server is a good proposal.
> > PS’ paper [1] have a early implement [2], and now it’s mxnet [3].
> > I have some thought about online PS in Flink:
> > 1. Whether support flexible and configurable update strategy?
> > For example, in one iteration, computing serval times updating once or update every
> time of iteration.
> > 2. Whether we implement is fully based on the DAG, having not too much modify the
> and core?
> > - The parameter server is the Source with distributed parameter data, called PS.
> > - The worker nodes are the DAG except the Source. That is some ML arithmetic implemented
> using Flink API.
> > - Multiple layer computing’s result flow to the Sink operator naturally
> > - Sink can feedback to the Source for next iteration
> > - Atomic tuning the busy operator, increase/decrease the compute resource(the max
> parallelism) of the runtime operators.
> > 3. Atomically detect GPU supporting provided by Mesos, and use it if enable configuration
> of using GPU.
> >
> > [1] https://www.cs.cmu.edu/~muli/file/ps.pdf  
> > [2] https://github.com/dmlc/parameter_server  
> > [3] https://github.com/dmlc/mxnet  
> >
> >> On Feb 12, 2017, at 00:54, Gyula Fóra wrote:
> >>
> >> Hi Gábor,
> >>
> >> I think the general idea is very nice, but it would nice to see clearer
> >> what benefit does this bring from the developers perspective. Maybe rough
> >> API sketch and 1-2 examples.
> >>
> >> I am wondering what sort of consistency guarantees do you imagine for such
> >> operations, or why the fault tolerance is even relevant. Are you thinking
> >> about an asynchronous API such as querying the state for another key might
> >> give you a Future that is guaranteed to complete eventually.
> >>
> >> It seems to be hard to guarantee the timeliness (order) of these operations
> >> with respect to the updates made to the states, so I wonder if there is
> >> benefit of doing this compared to using the Queryable state interface. Is
> >> this only a potential performance improvement or is it easier to work with
> >> this?
> >>
> >> Cheers,
> >> Gyula
> >>
> >> Gábor Hermann ezt írta (időpont: 2017. febr. 10.,
> >> P, 16:01):
> >>
> >>> Hi all,
> >>>
> >>> TL;DR: Is it worth to implement a special QueryableState for querying
> >>> state from another part of a Flink streaming job and aligning it with
> >>> fault tolerance?
> >>>
> >>> I've been thinking about implementing a Parameter Server with/within
> >>> Flink. A Parameter Server is basically a specialized key-value store
> >>> optimized for training distributed machine learning models. So not only
> >>> the training data, but also the model is distributed. Range queries are
> >>> typical, and usually vectors and matrices are stored as values.
> >>>
> >>> More generally, an integrated key-value store might also be useful in
> >>> the Streaming API. Although external key-value stores can be used inside
> >>> UDFs for the same purpose, aligning them with the fault tolerance
> >>> mechanism of Flink could be hard. What if state distributed by a key (in
> >>> the current Streaming API) could be queried from another operator? Much
> >>> like QueryableState, but querying *inside* the Flink job. We could make
> >>> use of the fact that state has been queried from inside to optimize
> >>> communication and integrate fault tolerance.
> >>>
> >>> The question is whether the Flink community would like such feature, and
> >>> if so how to do it?
> >>>
> >>> I could elaborate my ideas if needed, and I'm happy to create a design
> >>> doc, but before that, I'd like to know what you all think about this.
> >>> Also, I don't know if I'm missing something, so please correct me. Here
> >>> are some quick notes regarding the integrated KV-store:
> >>>
> >>> Pros
> >>> - It could allow easier implementation of more complicated use-cases.
> >>> E.g. updating users preferences simultaneously based on each others
> >>> preferences when events happen between them such as making a connection,
> >>> liking each other posts, or going to the same concert. User preferences
> >>> are distributed as a state, an event about user A liking user B gets
> >>> sent to A's state and queries the state of B, then updates the state of
> >>> B. There have been questions on the user mailing list for similar
> >>> problems [1].
> >>> - Integration with fault tolerance. User does not have to maintain two
> >>> systems consistently.
> >>> - Optimization potentials. At the social network example maybe other
> >>> users on the same partitions with user A need the state of user B, so we
> >>> don't have to send around user B twice.
> >>> - Could specialize to a Parameter Server for simple (and efficient)
> >>> implementation of (possibly online) machine learning. E.g. sparse
> >>> logistic regression, LDA, matrix factorization for recommendation systems.
> >>>
> >>> Cons
> >>> - Lot of development effort.
> >>> - "Client-server" architecture goes against the DAG dataflow model.
> >>>
> >>> Two approaches for the implementation in the streaming API:
> >>>
> >>> 1) An easy way to implement this is to use iterations (or the proposed
> >>> loops API). We can achieve two-way communication by two operators in a
> >>> loop: a worker (W) and a Parameter Server (PS), see the diagram [2]. (An
> >>> additional nested loop in the PS could add replication opportunities).
> >>> Then we would get fault tolerance "for free" by the work of Paris [3].
> >>> It would also be on top of the Streaming API, with no effect on the
> >>> runtime.
> >>>
> >>> 2) A problem with the loop approach is that coordination between PS
> >>> nodes and worker nodes can only be done on the data stream. We could not
> >>> really use e.g. Akka for async coordination. A harder but more flexible
> >>> way is to use lower-level interfaces of Flink and touch the runtime.
> >>> Then we would have to take care of fault tolerance too.
> >>>
> >>> (As a side note: in the batch API generalizing delta iterations could be
> >>> a solution for Parameter Server [4].)
> >>>
> >>> Thanks for any feedback :)
> >>>
> >>> Cheers,
> >>> Gabor
> >>>
> >>> [1]
> >>>
> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sharded-state-2-step-operation-td8631.html
> >>> [2] https://i.imgur.com/GsliUIh.png
> >>> [3] https://github.com/apache/flink/pull/1668
> >>> [4]
> >>>
> >>> https://www.linkedin.com/pulse/stale-synchronous-parallelism-new-frontier-apache-flink-nam-luc-tran
> >>>
> >>>
> >

View raw message