flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Tzoumas <ktzou...@apache.org>
Subject Re: [DISCUSS] Improving State/Timers/Windows
Date Mon, 14 Dec 2015 10:14:04 GMT
I suppose that they can start as sugar and evolve to a different
implementation.

I would +1 the name change to KVState, OperatorState is indeed somewhat
confusing, and it will only get harder to rename later.

On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gyula.fora@gmail.com> wrote:

> Would the Reducing/Folding states just be some API sugar on top of what we
> have know (ValueState) or does it have some added functionality (like
> incremental checkpoints for list states)?
>
> Gyula
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. dec. 14.,
> H, 11:03):
>
> > While enhancing the state interfaces we would also need to introduce new
> > types of state. I was thinking of these, for a start:
> >  - ValueState (works like OperatorState works now, i.e. provides methods
> > to get/set one state value
> >  - ListState, proves methods to add one element to a list of elements and
> > to iterate over all contained elements
> >  - ReducingState, somewhat similar to value state but combines the added
> > value to the existing value using a ReduceFunction
> >  - FoldingState, same as above but with fold
> >
> > I think these are necessary to give the system more knowledge about the
> > semantics of state so that it can handle the state more efficiently.
> Think
> > of incremental checkpoints, for example, these are easy to do if you know
> > that state is a list to which stuff is only appended.
> > > On 14 Dec 2015, at 10:52, Stephan Ewen <sewen@apache.org> wrote:
> > >
> > > A lot of this makes sense, but I am not sure about renaming
> > > "OperatorState". The other name is nicer, but why make users' life hard
> > > just for a name?
> > >
> > >
> > > On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mxm@apache.org>
> > wrote:
> > >
> > >> Hi Aljoscha,
> > >>
> > >> Thanks for the informative technical description.
> > >>
> > >>> - function state: this is the state that you get when a user function
> > >> implements the Checkpointed interface. it is not partitioned
> > >>> - operator state: This is the state that a StreamOperator can
> snapshot,
> > >> it is similar to the function state, but for operators. it is not
> > >> partitioned
> > >>> - partitioned state: state that is scoped to the key of the incoming
> > >> element, in Flink, this is (confusingly) called OperatorState and
> > KvState
> > >> (internally)
> > >>
> > >> Let's clean that up! Let's rename the OperatorState interface to
> > KvState.
> > >>
> > >>> Both stream operators and user functions can have partitioned state,
> > and
> > >> the namespace is the same, i.e. the state can clash. The partitioned
> > state
> > >> will stay indefinitely if not manually cleared.
> > >>
> > >> I suppose operators currently have to take care to use a unique
> > >> identifier for the state such that it doesn't clash with the user
> > >> function. Wouldn't be too hard to introduce a scoping here.
> > >>
> > >> Your proposal makes sense. It seems like this is a rather delicate
> > >> change which improves the flexibility of the streaming API. What is
> > >> the motivation behind this? I suppose you are thinking of improvements
> > >> to the session capabilities of the streaming API.
> > >>
> > >>> If we want to also implement the current WindowOperator on top of
> these
> > >> generic facilities we need to have a way to scope state not only by
> key
> > but
> > >> also by windows (or better, some generic state scope).
> > >>
> > >> This is currently handled by the WindowOperator itself and would then
> > >> be delegated to the enhanced state interface? Makes sense if we want
> > >> to make use of the new state interface. Again, is it just cleaner or
> > >> does this enable new type of applications?
> > >>
> > >> Cheers,
> > >> Max
> > >>
> > >> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> > >> wrote:
> > >>> Hi All,
> > >>> I want to discuss some ideas about improving the
> primitives/operations
> > >> that Flink offers for user-state, timers and windows and how these
> > concepts
> > >> can be unified.
> > >>>
> > >>> It has come up a lot lately that people have very specific
> requirements
> > >> regarding the state that they keep and it seems necessary to allows
> > users
> > >> to set their own custom timers (on processing time and watermark time
> > >> (event-time)) to do both expiration of state and implementation of
> > custom
> > >> windowing semantics. While we’re at this, we might also think about
> > >> cleaning up the state handling a bit.
> > >>>
> > >>> Let me first describe the status quo, so that we’re all on the same
> > >> page. There are three types of state:
> > >>> - function state: this is the state that you get when a user function
> > >> implements the Checkpointed interface. it is not partitioned
> > >>> - operator state: This is the state that a StreamOperator can
> snapshot,
> > >> it is similar to the function state, but for operators. it is not
> > >> partitioned
> > >>> - partitioned state: state that is scoped to the key of the incoming
> > >> element, in Flink, this is (confusingly) called OperatorState and
> > KvState
> > >> (internally)
> > >>>
> > >>> (Operator is the low-level concept, user functions are usually
> invoked
> > >> by the operator, for example StreamMap is the operator that handles a
> > >> MapFunction.)
> > >>>
> > >>> Function state and operator state is not partitioned, meaning that
it
> > >> becomes difficult when we want to implement dynamic
> scale-in/scale-out.
> > >> With partitioned state it can be redistributed when changing the
> degree
> > of
> > >> parallelism.
> > >>>
> > >>> Both stream operators and user functions can have partitioned state,
> > and
> > >> the namespace is the same, i.e. the state can clash. The partitioned
> > state
> > >> will stay indefinitely if not manually cleared.
> > >>>
> > >>> On to timers, operators can register processing-time callbacks, they
> > can
> > >> react to watermarks to implement event-time callbacks. They have to
> > >> implement the logic themselves, however. For example, the
> WindowOperator
> > >> has custom code to keep track of watermark timers and for reacting to
> > >> watermarks. User functions have no way of registering timers. Also,
> > timers
> > >> are not scoped to any key. So if you register a timer while processing
> > an
> > >> element of a certain key, when the timer fires you don’t know what key
> > was
> > >> active when registering the timer. This might be necessary for
> cleaning
> > up
> > >> state for certain keys, or to trigger processing for a certain key
> only,
> > >> for example with session windows of some kind.
> > >>>
> > >>> Now, on to new stuff. I propose to add a timer facility that can be
> > used
> > >> by both operators and user functions. Both partitioned state and
> timers
> > >> should be aware of keys and if a timer fires the partitioned state
> > should
> > >> be scoped to the same key that was active when the timer was
> registered.
> > >>>
> > >>> One last bit. If we want to also implement the current WindowOperator
> > on
> > >> top of these generic facilities we need to have a way to scope state
> not
> > >> only by key but also by windows (or better, some generic state scope).
> > The
> > >> reason is, that one key can have several active windows at one point
> in
> > >> time and firing timers need to me mapped to the correct window (for
> > >> example, for sliding windows, or session windows or what have you…).
> > >>>
> > >>> Happy discussing. :D
> > >>>
> > >>> Cheers,
> > >>> Aljoscha
> > >>>
> > >>>
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message