flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: [DISCUSS] Improving State/Timers/Windows
Date Mon, 14 Dec 2015 10:34:58 GMT
>
> On a side not, why would you call it KvState? And what would be called
> KvState?


The OperatorState interface would be called KvState.


On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Yes, as Kostas said, it would initially nor provide more functionality but
> it would enable us to add it later.
>
> On a side not, why would you call it KvState? And what would be called
> KvState?
>
> > On 14 Dec 2015, at 11:14, Kostas Tzoumas <ktzoumas@apache.org> wrote:
> >
> > I suppose that they can start as sugar and evolve to a different
> > implementation.
> >
> > I would +1 the name change to KVState, OperatorState is indeed somewhat
> > confusing, and it will only get harder to rename later.
> >
> > On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> >
> >> Would the Reducing/Folding states just be some API sugar on top of what
> we
> >> have know (ValueState) or does it have some added functionality (like
> >> incremental checkpoints for list states)?
> >>
> >> Gyula
> >>
> >> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. dec.
> 14.,
> >> H, 11:03):
> >>
> >>> While enhancing the state interfaces we would also need to introduce
> new
> >>> types of state. I was thinking of these, for a start:
> >>> - ValueState (works like OperatorState works now, i.e. provides methods
> >>> to get/set one state value
> >>> - ListState, proves methods to add one element to a list of elements
> and
> >>> to iterate over all contained elements
> >>> - ReducingState, somewhat similar to value state but combines the added
> >>> value to the existing value using a ReduceFunction
> >>> - FoldingState, same as above but with fold
> >>>
> >>> I think these are necessary to give the system more knowledge about the
> >>> semantics of state so that it can handle the state more efficiently.
> >> Think
> >>> of incremental checkpoints, for example, these are easy to do if you
> know
> >>> that state is a list to which stuff is only appended.
> >>>> On 14 Dec 2015, at 10:52, Stephan Ewen <sewen@apache.org> wrote:
> >>>>
> >>>> A lot of this makes sense, but I am not sure about renaming
> >>>> "OperatorState". The other name is nicer, but why make users' life
> hard
> >>>> just for a name?
> >>>>
> >>>>
> >>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mxm@apache.org>
> >>> wrote:
> >>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Thanks for the informative technical description.
> >>>>>
> >>>>>> - function state: this is the state that you get when a user
> function
> >>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>> - operator state: This is the state that a StreamOperator can
> >> snapshot,
> >>>>> it is similar to the function state, but for operators. it is not
> >>>>> partitioned
> >>>>>> - partitioned state: state that is scoped to the key of the
incoming
> >>>>> element, in Flink, this is (confusingly) called OperatorState and
> >>> KvState
> >>>>> (internally)
> >>>>>
> >>>>> Let's clean that up! Let's rename the OperatorState interface to
> >>> KvState.
> >>>>>
> >>>>>> Both stream operators and user functions can have partitioned
state,
> >>> and
> >>>>> the namespace is the same, i.e. the state can clash. The partitioned
> >>> state
> >>>>> will stay indefinitely if not manually cleared.
> >>>>>
> >>>>> I suppose operators currently have to take care to use a unique
> >>>>> identifier for the state such that it doesn't clash with the user
> >>>>> function. Wouldn't be too hard to introduce a scoping here.
> >>>>>
> >>>>> Your proposal makes sense. It seems like this is a rather delicate
> >>>>> change which improves the flexibility of the streaming API. What
is
> >>>>> the motivation behind this? I suppose you are thinking of
> improvements
> >>>>> to the session capabilities of the streaming API.
> >>>>>
> >>>>>> If we want to also implement the current WindowOperator on top
of
> >> these
> >>>>> generic facilities we need to have a way to scope state not only
by
> >> key
> >>> but
> >>>>> also by windows (or better, some generic state scope).
> >>>>>
> >>>>> This is currently handled by the WindowOperator itself and would
then
> >>>>> be delegated to the enhanced state interface? Makes sense if we
want
> >>>>> to make use of the new state interface. Again, is it just cleaner
or
> >>>>> does this enable new type of applications?
> >>>>>
> >>>>> Cheers,
> >>>>> Max
> >>>>>
> >>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
> >> aljoscha@apache.org>
> >>>>> wrote:
> >>>>>> Hi All,
> >>>>>> I want to discuss some ideas about improving the
> >> primitives/operations
> >>>>> that Flink offers for user-state, timers and windows and how these
> >>> concepts
> >>>>> can be unified.
> >>>>>>
> >>>>>> It has come up a lot lately that people have very specific
> >> requirements
> >>>>> regarding the state that they keep and it seems necessary to allows
> >>> users
> >>>>> to set their own custom timers (on processing time and watermark
time
> >>>>> (event-time)) to do both expiration of state and implementation
of
> >>> custom
> >>>>> windowing semantics. While we’re at this, we might also think
about
> >>>>> cleaning up the state handling a bit.
> >>>>>>
> >>>>>> Let me first describe the status quo, so that we’re all on
the same
> >>>>> page. There are three types of state:
> >>>>>> - function state: this is the state that you get when a user
> function
> >>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>> - operator state: This is the state that a StreamOperator can
> >> snapshot,
> >>>>> it is similar to the function state, but for operators. it is not
> >>>>> partitioned
> >>>>>> - partitioned state: state that is scoped to the key of the
incoming
> >>>>> element, in Flink, this is (confusingly) called OperatorState and
> >>> KvState
> >>>>> (internally)
> >>>>>>
> >>>>>> (Operator is the low-level concept, user functions are usually
> >> invoked
> >>>>> by the operator, for example StreamMap is the operator that handles
a
> >>>>> MapFunction.)
> >>>>>>
> >>>>>> Function state and operator state is not partitioned, meaning
that
> it
> >>>>> becomes difficult when we want to implement dynamic
> >> scale-in/scale-out.
> >>>>> With partitioned state it can be redistributed when changing the
> >> degree
> >>> of
> >>>>> parallelism.
> >>>>>>
> >>>>>> Both stream operators and user functions can have partitioned
state,
> >>> and
> >>>>> the namespace is the same, i.e. the state can clash. The partitioned
> >>> state
> >>>>> will stay indefinitely if not manually cleared.
> >>>>>>
> >>>>>> On to timers, operators can register processing-time callbacks,
they
> >>> can
> >>>>> react to watermarks to implement event-time callbacks. They have
to
> >>>>> implement the logic themselves, however. For example, the
> >> WindowOperator
> >>>>> has custom code to keep track of watermark timers and for reacting
to
> >>>>> watermarks. User functions have no way of registering timers. Also,
> >>> timers
> >>>>> are not scoped to any key. So if you register a timer while
> processing
> >>> an
> >>>>> element of a certain key, when the timer fires you don’t know
what
> key
> >>> was
> >>>>> active when registering the timer. This might be necessary for
> >> cleaning
> >>> up
> >>>>> state for certain keys, or to trigger processing for a certain key
> >> only,
> >>>>> for example with session windows of some kind.
> >>>>>>
> >>>>>> Now, on to new stuff. I propose to add a timer facility that
can be
> >>> used
> >>>>> by both operators and user functions. Both partitioned state and
> >> timers
> >>>>> should be aware of keys and if a timer fires the partitioned state
> >>> should
> >>>>> be scoped to the same key that was active when the timer was
> >> registered.
> >>>>>>
> >>>>>> One last bit. If we want to also implement the current
> WindowOperator
> >>> on
> >>>>> top of these generic facilities we need to have a way to scope state
> >> not
> >>>>> only by key but also by windows (or better, some generic state
> scope).
> >>> The
> >>>>> reason is, that one key can have several active windows at one point
> >> in
> >>>>> time and firing timers need to me mapped to the correct window (for
> >>>>> example, for sliding windows, or session windows or what have you…).
> >>>>>>
> >>>>>> Happy discussing. :D
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Aljoscha
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message