flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Hermann <m...@gaborhermann.com>
Subject Re: Using QueryableState inside Flink jobs (and Parameter Server implementation)
Date Tue, 14 Feb 2017 14:54:36 GMT
Hey Theodore,

Thanks for pointing us to Tensorflow, I didn't think it would be useful 
for this.
 From what you cite they seem to have a very similar state-handling 
mechanism to Flink (as well as fault tolerance).
I'm sure we can get ideas from how they do model-parallel training. I'll 
definitely check out the whitepaper! :)


On 2017-02-14 15:42, Theodore Vasiloudis wrote:

> Hello all,
> I would also be really interested in how a PS-like architecture would work
> in Flink. Note that we not necessarily talking about PS, but generally how
> QueryableState can be used for ML tasks with I guess a focus on
> model-parallel training.
> One suggestion I would make is to take a look at Tensorflow which is also a
> dataflow model that has support for distributed computation, both data and
> model parallel.
> I don't know too much about the internal workings of the system, but I
> would point out this from the TF whitepaper [1], Section 11 related work:
> It also permits a significant simplification by
>> allowing  the  expression  of  stateful  parameter  nodes  as
>> variables,  and  variable  update  operations  that  are  just
>> additional  nodes  in  the  graph;  in  contrast,  DistBelief,
>> Project Adam and the Parameter Server systems all have
>> whole separate parameter server subsystems devoted to
>> communicating and updating parameter values.
> I think the Related work section is the most relevant to this discussion as
> it discusses the differences between the programming models in Spark, Naiad
> etc. to the TF model.
> Also re. fault tolerance:
> When a failure is detected, the entire graph execution
>> is  aborted  and  restarted  from  scratch.   Recall  however
>> that Variable nodes refer to tensors that persist across ex-
>> ecutions of the graph. We support consistent checkpoint-
>> ing and recovery of this state on a restart.  In particular,
>> each Variable node is connected to a Save node.  These
>> Save nodes are executed periodically, say once every N
>> iterations, or once every N seconds. When they execute,
>> the contents of the variables are written to persistent stor-
>> age, e.g., a distributed file system.  Similarly each Vari-
>> able is connected to a Restore node that is only enabled
>> in the first iteration after a restart.
> [1] http://download.tensorflow.org/paper/whitepaper2015.pdf
> On Tue, Feb 14, 2017 at 3:18 PM, Gábor Hermann <mail@gaborhermann.com>
> wrote:
>> Hey Ufuk,
>> I'm happy to contribute. At least I'll get a bit more understanding of the
>> details.
>> Breaking the assumption that only a single thread updates state would
>> brings us from strong isolation guarantees (i.e. serializability at the
>> updates and read committed at the external queries) to no isolation
>> guarantees. That's not something to be taken lightly. I think that these
>> guarantees would be more easily provided for inside queries that modify
>> (setKvState), but that's still not trivial.
>> Indeed, the iteration approach works better for the use-cases I mentioned,
>> at least for now.
>> Cheers,
>> Gabor
>> On 2017-02-14 14:43, Ufuk Celebi wrote:
>> Hey Gabor,
>>> great ideas here. It's only slightly related, but I'm currently working
>>> on a proposal to improve the queryable state APIs for lookups (partly along
>>> the lines of what you suggested with higher level accessors). Maybe you are
>>> interested in contributing there?
>>> I really like your ideas for the use cases you describe, but I'm unsure
>>> about the write path (setKvState), because of the discussed implications to
>>> the state backends. I think that this needs more discussion and
>>> coordination with the contributors working on the backends. For example,
>>> one assumption so far was that only a single thread updates state and we
>>> don't scope state per checkpoint (to provide "isolation levels" for the
>>> queries; read comitted vs. read uncommitted) and probably more.
>>> Because of this I would actually lean towards the iteration approach in a
>>> first version. Would that be a feasible starting point for you?
>>> – Ufuk
>>> On 14 February 2017 at 14:01:21, Gábor Hermann (mail@gaborhermann.com)
>>> wrote:
>>>> Hi Gyula, Jinkui Shi,
>>>>    Thanks for your thoughts!
>>>>    @Gyula: I'll try and explain a bit more detail.
>>>>    The API could be almost like the QueryableState's. It could be
>>>> higher-level though: returning Java objects instead of serialized data
>>>> (because there would not be issues with class loading). Also, it could
>>>> support setKvState (see my 5. point). This could lead to both a
>>>> potential performance improvements and easier usage (see my points 2.
>>>> and 3.).
>>>>    A use-case could be anything where we'd use an external KV-store.
>>>> For instance we are updating user states based on another user state, so
>>>> in the map function we do a query (in pseudo-ish Scala code):
>>>>    users.keyBy(0).flatMapWithState { (userEvent, collector) =>
>>>> val thisUser: UserState = state.get()
>>>> val otherUser: Future[UserState] =
>>>> qsClient.getKvState("users", userEvent.otherUserId)
>>>>    otherUser.onSuccess { case otherUserState =>
>>>> state.update(someFunc1(thisUser, otherUserState))
>>>> collector.collect(someFunc2(thisUser, otherUserState))
>>>> }
>>>> }
>>>>    Another example could be (online) distributed matrix factorization,
>>>> where the two factor matrices are represented by distributed states. One
>>>> is updated by querying the other (with getKvState), and computing some
>>>> function (i.e. SGD), while the other is updated at the same place (with
>>>> setKvState).
>>>>    I see the motivation behind the QueryableState as a way to make further
>>>> use of the KV-store we practically have at stateful operators (but
>>>> please correct me if I'm wrong). I think we could make even more use of
>>>> if the KV-store is used inside the same job.
>>>>    1) Order and timeliness
>>>> As you've mentioned it's hard to guarantee any ordering when working
>>>> with two states on possibly distinct machines. This could bring us to
>>>> distributed transaction processing, what's a complex topic in itself. I
>>>> can imagine using watermarks and keeping different state versions to
>>>> only allow querying state from the past, and not from the future. For
>>>> now, let's just assume that order does not matter.
>>>>    2) Fault-tolerance
>>>> There might be other things we could do, but there are two simple
>>>> guarantees that we can surely provide. First, by using the current
>>>> QueryableState the task could fail with incomplete futures. If the
>>>> records producing those futures are received before the previous
>>>> checkpoint barrier, those updates will be completely lost. We could
>>>> solve this by wait for the futures to complete before starting a
>>>> checkpoint, thus providing exactly-once guarantees. This would ensure
>>>> that, although the UDF has side-effects, every record has its effect
>>>> exactly-once. I don't see a good way to provide this guarantee with the
>>>> current QueryableState. Second, we can guarantee that the query will
>>>> eventually succeed (or else the whole topology would fail).
>>>>    3) Optimizations
>>>> I've also got two examples for optimizations. First, we can do a
>>>> best-effort to co-locate the two stateful operators to save on network
>>>> overhead. The user could try to co-locate the querying machines when
>>>> externally querying the state, but could not move the machines with the
>>>> state being queried. Second, we could provide a user-interface for
>>>> (loose) guarantees on the latency of sending and returning queries, just
>>>> like setting the buffer timeout.
>>>>    4) Concurrent reading/writing
>>>> Key-value states and collectors might be accessed concurrently. While
>>>> the user could use locks, we the system handle this instead of the user.
>>>> E.g. using a thread-safe collector whenever we see internal KV-state
>>>> query registered at the UDF.
>>>>    5) setKvState
>>>> We could not give exactly-once guarantees if we allowed external queries
>>>> to modify the state. When a Flink topology fails and restarts the
>>>> modifications coming from the outside would not be replayed. However, we
>>>> can simply give exactly-once guarantees if the modifications are done
>>>> inside (set aside ordering), as the records would be replayed if the
>>>> modification failed.
>>>>    I believe it would not take much effort to do these improvements.
>>>> Although, it would affect the runtime (integration with FT), and it
>>>> might not be worth working towards these goals. What do you think about
>>>> this?
>>>>    It's also worth considering that the same use-cases could be similarly
>>>> done with the iteration/loops API, but in a bit less natural way,
>>>> imitating two-direction communication.
>>>>    Should we move this discussion to a JIRA issue, to avoid flooding the
>>>> mailing list?
>>>>      @Jinkui Shi:
>>>> 1. I think we should definitely support a flexible update strategy. I.e.
>>>> allow to choose between sync, async and bounded-delay.
>>>> 2. I really like your idea of using PS externally and connecting to it
>>>> with a source and a sink. Fault-tolerance could be also be achieved by
>>>> versioning at the PS and resending older parameters if the Flink job
>>>> fails (i.e. making PS a durable source). Although, the question is then
>>>> how to implement the PS? Do you think we could use the implementations
>>>> you've mentioned?
>>>> 3. Good idea. It's been just proposed to support GPU calculations in
>>>> Flink [1]. Fortunately, that seems orthogonal to our problem: if there's
>>>> a PS, we can later include GPU calculations.
>>>>    My main question remains: whether there's a need for an integrated PS
>>>> or
>>>> not. It would not fit into the current project structure (neither in ML
>>>> nor in Streaming), so I guess the only direction is to build on top of
>>>> the streaming API and use an external service just as you've proposed.
>>>>    [1] https://issues.apache.org/jira/browse/FLINK-5782
>>>>    Cheers,
>>>> Gabor
>>>>      On 2017-02-13 04:10, Jinkui Shi wrote:
>>>>> hi,Gábor Hermann
>>>>> The online parameter server is a good proposal.
>>>>> PS’ paper [1] have a early implement [2], and now it’s mxnet [3].
>>>>> I have some thought about online PS in Flink:
>>>>> 1. Whether support flexible and configurable update strategy?
>>>>> For example, in one iteration, computing serval times updating once or
>>>>> update every
>>>> time of iteration.
>>>>> 2. Whether we implement is fully based on the DAG, having not too much
>>>>> modify the runtime
>>>> and core?
>>>>> - The parameter server is the Source with distributed parameter data,
>>>>> called PS.
>>>>> - The worker nodes are the DAG except the Source. That is some ML
>>>>> arithmetic implemented
>>>> using Flink API.
>>>>> - Multiple layer computing’s result flow to the Sink operator naturally
>>>>> - Sink can feedback to the Source for next iteration
>>>>> - Atomic tuning the busy operator, increase/decrease the compute
>>>>> resource(the max
>>>> parallelism) of the runtime operators.
>>>>> 3. Atomically detect GPU supporting provided by Mesos, and use it if
>>>>> enable configuration
>>>> of using GPU.
>>>>> [1] https://www.cs.cmu.edu/~muli/file/ps.pdf
>>>>> [2] https://github.com/dmlc/parameter_server
>>>>> [3] https://github.com/dmlc/mxnet
>>>>> On Feb 12, 2017, at 00:54, Gyula Fóra wrote:
>>>>>> Hi Gábor,
>>>>>> I think the general idea is very nice, but it would nice to see clearer
>>>>>> what benefit does this bring from the developers perspective. Maybe
>>>>>> rough
>>>>>> API sketch and 1-2 examples.
>>>>>> I am wondering what sort of consistency guarantees do you imagine
>>>>>> such
>>>>>> operations, or why the fault tolerance is even relevant. Are you
>>>>>> thinking
>>>>>> about an asynchronous API such as querying the state for another
>>>>>> might
>>>>>> give you a Future that is guaranteed to complete eventually.
>>>>>> It seems to be hard to guarantee the timeliness (order) of these
>>>>>> operations
>>>>>> with respect to the updates made to the states, so I wonder if there
>>>>>> benefit of doing this compared to using the Queryable state interface.
>>>>>> Is
>>>>>> this only a potential performance improvement or is it easier to
>>>>>> with
>>>>>> this?
>>>>>> Cheers,
>>>>>> Gyula
>>>>>> Gábor Hermann ezt írta (időpont: 2017. febr. 10.,
>>>>>> P, 16:01):
>>>>>> Hi all,
>>>>>>> TL;DR: Is it worth to implement a special QueryableState for
>>>>>>> state from another part of a Flink streaming job and aligning
it with
>>>>>>> fault tolerance?
>>>>>>> I've been thinking about implementing a Parameter Server with/within
>>>>>>> Flink. A Parameter Server is basically a specialized key-value
>>>>>>> optimized for training distributed machine learning models. So
>>>>>>> only
>>>>>>> the training data, but also the model is distributed. Range queries
>>>>>>> are
>>>>>>> typical, and usually vectors and matrices are stored as values.
>>>>>>> More generally, an integrated key-value store might also be useful
>>>>>>> the Streaming API. Although external key-value stores can be
>>>>>>> inside
>>>>>>> UDFs for the same purpose, aligning them with the fault tolerance
>>>>>>> mechanism of Flink could be hard. What if state distributed by
a key
>>>>>>> (in
>>>>>>> the current Streaming API) could be queried from another operator?
>>>>>>> Much
>>>>>>> like QueryableState, but querying *inside* the Flink job. We
>>>>>>> make
>>>>>>> use of the fact that state has been queried from inside to optimize
>>>>>>> communication and integrate fault tolerance.
>>>>>>> The question is whether the Flink community would like such feature,
>>>>>>> and
>>>>>>> if so how to do it?
>>>>>>> I could elaborate my ideas if needed, and I'm happy to create
a design
>>>>>>> doc, but before that, I'd like to know what you all think about
>>>>>>> Also, I don't know if I'm missing something, so please correct
>>>>>>> Here
>>>>>>> are some quick notes regarding the integrated KV-store:
>>>>>>> Pros
>>>>>>> - It could allow easier implementation of more complicated use-cases.
>>>>>>> E.g. updating users preferences simultaneously based on each
>>>>>>> preferences when events happen between them such as making a
>>>>>>> connection,
>>>>>>> liking each other posts, or going to the same concert. User
>>>>>>> preferences
>>>>>>> are distributed as a state, an event about user A liking user
B gets
>>>>>>> sent to A's state and queries the state of B, then updates the
>>>>>>> of
>>>>>>> B. There have been questions on the user mailing list for similar
>>>>>>> problems [1].
>>>>>>> - Integration with fault tolerance. User does not have to maintain
>>>>>>> systems consistently.
>>>>>>> - Optimization potentials. At the social network example maybe
>>>>>>> users on the same partitions with user A need the state of user
B, so
>>>>>>> we
>>>>>>> don't have to send around user B twice.
>>>>>>> - Could specialize to a Parameter Server for simple (and efficient)
>>>>>>> implementation of (possibly online) machine learning. E.g. sparse
>>>>>>> logistic regression, LDA, matrix factorization for recommendation
>>>>>>> systems.
>>>>>>> Cons
>>>>>>> - Lot of development effort.
>>>>>>> - "Client-server" architecture goes against the DAG dataflow
>>>>>>> Two approaches for the implementation in the streaming API:
>>>>>>> 1) An easy way to implement this is to use iterations (or the
>>>>>>> loops API). We can achieve two-way communication by two operators
in a
>>>>>>> loop: a worker (W) and a Parameter Server (PS), see the diagram
>>>>>>> (An
>>>>>>> additional nested loop in the PS could add replication opportunities).
>>>>>>> Then we would get fault tolerance "for free" by the work of Paris
>>>>>>> It would also be on top of the Streaming API, with no effect
on the
>>>>>>> runtime.
>>>>>>> 2) A problem with the loop approach is that coordination between
>>>>>>> nodes and worker nodes can only be done on the data stream. We
>>>>>>> not
>>>>>>> really use e.g. Akka for async coordination. A harder but more
>>>>>>> flexible
>>>>>>> way is to use lower-level interfaces of Flink and touch the runtime.
>>>>>>> Then we would have to take care of fault tolerance too.
>>>>>>> (As a side note: in the batch API generalizing delta iterations
>>>>>>> be
>>>>>>> a solution for Parameter Server [4].)
>>>>>>> Thanks for any feedback :)
>>>>>>> Cheers,
>>>>>>> Gabor
>>>>>>> [1]
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>>>>>>> nabble.com/sharded-state-2-step-operation-td8631.html
>>>>>>> [2] https://i.imgur.com/GsliUIh.png
>>>>>>> [3] https://github.com/apache/flink/pull/1668
>>>>>>> [4]
>>>>>>> https://www.linkedin.com/pulse/stale-synchronous-parallelism
>>>>>>> -new-frontier-apache-flink-nam-luc-tran

View raw message