flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Tzoumas <ktzou...@apache.org>
Subject Re: [DISCUSS] Improving State/Timers/Windows
Date Mon, 14 Dec 2015 10:46:34 GMT
oh, sorry, I misread. Just my +1 to renaming OperatorState then :-)

On Mon, Dec 14, 2015 at 11:38 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> As I mentioned in my previous mail, I think that OperatorState would need
> be replaced by more specific types of state (ValueState, ListState, …).
>
> > On 14 Dec 2015, at 11:34, Maximilian Michels <mxm@apache.org> wrote:
> >
> >>
> >> On a side not, why would you call it KvState? And what would be called
> >> KvState?
> >
> >
> > The OperatorState interface would be called KvState.
> >
> >
> > On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <aljoscha@apache.org>
> > wrote:
> >
> >> Yes, as Kostas said, it would initially nor provide more functionality
> but
> >> it would enable us to add it later.
> >>
> >> On a side not, why would you call it KvState? And what would be called
> >> KvState?
> >>
> >>> On 14 Dec 2015, at 11:14, Kostas Tzoumas <ktzoumas@apache.org> wrote:
> >>>
> >>> I suppose that they can start as sugar and evolve to a different
> >>> implementation.
> >>>
> >>> I would +1 the name change to KVState, OperatorState is indeed somewhat
> >>> confusing, and it will only get harder to rename later.
> >>>
> >>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gyula.fora@gmail.com>
> >> wrote:
> >>>
> >>>> Would the Reducing/Folding states just be some API sugar on top of
> what
> >> we
> >>>> have know (ValueState) or does it have some added functionality (like
> >>>> incremental checkpoints for list states)?
> >>>>
> >>>> Gyula
> >>>>
> >>>> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015.
dec.
> >> 14.,
> >>>> H, 11:03):
> >>>>
> >>>>> While enhancing the state interfaces we would also need to introduce
> >> new
> >>>>> types of state. I was thinking of these, for a start:
> >>>>> - ValueState (works like OperatorState works now, i.e. provides
> methods
> >>>>> to get/set one state value
> >>>>> - ListState, proves methods to add one element to a list of elements
> >> and
> >>>>> to iterate over all contained elements
> >>>>> - ReducingState, somewhat similar to value state but combines the
> added
> >>>>> value to the existing value using a ReduceFunction
> >>>>> - FoldingState, same as above but with fold
> >>>>>
> >>>>> I think these are necessary to give the system more knowledge about
> the
> >>>>> semantics of state so that it can handle the state more efficiently.
> >>>> Think
> >>>>> of incremental checkpoints, for example, these are easy to do if
you
> >> know
> >>>>> that state is a list to which stuff is only appended.
> >>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <sewen@apache.org>
wrote:
> >>>>>>
> >>>>>> A lot of this makes sense, but I am not sure about renaming
> >>>>>> "OperatorState". The other name is nicer, but why make users'
life
> >> hard
> >>>>>> just for a name?
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <
> mxm@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> Thanks for the informative technical description.
> >>>>>>>
> >>>>>>>> - function state: this is the state that you get when
a user
> >> function
> >>>>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>>>> - operator state: This is the state that a StreamOperator
can
> >>>> snapshot,
> >>>>>>> it is similar to the function state, but for operators.
it is not
> >>>>>>> partitioned
> >>>>>>>> - partitioned state: state that is scoped to the key
of the
> incoming
> >>>>>>> element, in Flink, this is (confusingly) called OperatorState
and
> >>>>> KvState
> >>>>>>> (internally)
> >>>>>>>
> >>>>>>> Let's clean that up! Let's rename the OperatorState interface
to
> >>>>> KvState.
> >>>>>>>
> >>>>>>>> Both stream operators and user functions can have partitioned
> state,
> >>>>> and
> >>>>>>> the namespace is the same, i.e. the state can clash. The
> partitioned
> >>>>> state
> >>>>>>> will stay indefinitely if not manually cleared.
> >>>>>>>
> >>>>>>> I suppose operators currently have to take care to use a
unique
> >>>>>>> identifier for the state such that it doesn't clash with
the user
> >>>>>>> function. Wouldn't be too hard to introduce a scoping here.
> >>>>>>>
> >>>>>>> Your proposal makes sense. It seems like this is a rather
delicate
> >>>>>>> change which improves the flexibility of the streaming API.
What is
> >>>>>>> the motivation behind this? I suppose you are thinking of
> >> improvements
> >>>>>>> to the session capabilities of the streaming API.
> >>>>>>>
> >>>>>>>> If we want to also implement the current WindowOperator
on top of
> >>>> these
> >>>>>>> generic facilities we need to have a way to scope state
not only by
> >>>> key
> >>>>> but
> >>>>>>> also by windows (or better, some generic state scope).
> >>>>>>>
> >>>>>>> This is currently handled by the WindowOperator itself and
would
> then
> >>>>>>> be delegated to the enhanced state interface? Makes sense
if we
> want
> >>>>>>> to make use of the new state interface. Again, is it just
cleaner
> or
> >>>>>>> does this enable new type of applications?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Max
> >>>>>>>
> >>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
> >>>> aljoscha@apache.org>
> >>>>>>> wrote:
> >>>>>>>> Hi All,
> >>>>>>>> I want to discuss some ideas about improving the
> >>>> primitives/operations
> >>>>>>> that Flink offers for user-state, timers and windows and
how these
> >>>>> concepts
> >>>>>>> can be unified.
> >>>>>>>>
> >>>>>>>> It has come up a lot lately that people have very specific
> >>>> requirements
> >>>>>>> regarding the state that they keep and it seems necessary
to allows
> >>>>> users
> >>>>>>> to set their own custom timers (on processing time and watermark
> time
> >>>>>>> (event-time)) to do both expiration of state and implementation
of
> >>>>> custom
> >>>>>>> windowing semantics. While we’re at this, we might also
think about
> >>>>>>> cleaning up the state handling a bit.
> >>>>>>>>
> >>>>>>>> Let me first describe the status quo, so that we’re
all on the
> same
> >>>>>>> page. There are three types of state:
> >>>>>>>> - function state: this is the state that you get when
a user
> >> function
> >>>>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>>>> - operator state: This is the state that a StreamOperator
can
> >>>> snapshot,
> >>>>>>> it is similar to the function state, but for operators.
it is not
> >>>>>>> partitioned
> >>>>>>>> - partitioned state: state that is scoped to the key
of the
> incoming
> >>>>>>> element, in Flink, this is (confusingly) called OperatorState
and
> >>>>> KvState
> >>>>>>> (internally)
> >>>>>>>>
> >>>>>>>> (Operator is the low-level concept, user functions are
usually
> >>>> invoked
> >>>>>>> by the operator, for example StreamMap is the operator that
> handles a
> >>>>>>> MapFunction.)
> >>>>>>>>
> >>>>>>>> Function state and operator state is not partitioned,
meaning that
> >> it
> >>>>>>> becomes difficult when we want to implement dynamic
> >>>> scale-in/scale-out.
> >>>>>>> With partitioned state it can be redistributed when changing
the
> >>>> degree
> >>>>> of
> >>>>>>> parallelism.
> >>>>>>>>
> >>>>>>>> Both stream operators and user functions can have partitioned
> state,
> >>>>> and
> >>>>>>> the namespace is the same, i.e. the state can clash. The
> partitioned
> >>>>> state
> >>>>>>> will stay indefinitely if not manually cleared.
> >>>>>>>>
> >>>>>>>> On to timers, operators can register processing-time
callbacks,
> they
> >>>>> can
> >>>>>>> react to watermarks to implement event-time callbacks. They
have to
> >>>>>>> implement the logic themselves, however. For example, the
> >>>> WindowOperator
> >>>>>>> has custom code to keep track of watermark timers and for
reacting
> to
> >>>>>>> watermarks. User functions have no way of registering timers.
Also,
> >>>>> timers
> >>>>>>> are not scoped to any key. So if you register a timer while
> >> processing
> >>>>> an
> >>>>>>> element of a certain key, when the timer fires you don’t
know what
> >> key
> >>>>> was
> >>>>>>> active when registering the timer. This might be necessary
for
> >>>> cleaning
> >>>>> up
> >>>>>>> state for certain keys, or to trigger processing for a certain
key
> >>>> only,
> >>>>>>> for example with session windows of some kind.
> >>>>>>>>
> >>>>>>>> Now, on to new stuff. I propose to add a timer facility
that can
> be
> >>>>> used
> >>>>>>> by both operators and user functions. Both partitioned state
and
> >>>> timers
> >>>>>>> should be aware of keys and if a timer fires the partitioned
state
> >>>>> should
> >>>>>>> be scoped to the same key that was active when the timer
was
> >>>> registered.
> >>>>>>>>
> >>>>>>>> One last bit. If we want to also implement the current
> >> WindowOperator
> >>>>> on
> >>>>>>> top of these generic facilities we need to have a way to
scope
> state
> >>>> not
> >>>>>>> only by key but also by windows (or better, some generic
state
> >> scope).
> >>>>> The
> >>>>>>> reason is, that one key can have several active windows
at one
> point
> >>>> in
> >>>>>>> time and firing timers need to me mapped to the correct
window (for
> >>>>>>> example, for sliding windows, or session windows or what
have
> you…).
> >>>>>>>>
> >>>>>>>> Happy discussing. :D
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message