flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Improving State/Timers/Windows
Date Mon, 14 Dec 2015 10:38:07 GMT
As I mentioned in my previous mail, I think that OperatorState would need be replaced by more
specific types of state (ValueState, ListState, …).

> On 14 Dec 2015, at 11:34, Maximilian Michels <mxm@apache.org> wrote:
> 
>> 
>> On a side not, why would you call it KvState? And what would be called
>> KvState?
> 
> 
> The OperatorState interface would be called KvState.
> 
> 
> On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> 
>> Yes, as Kostas said, it would initially nor provide more functionality but
>> it would enable us to add it later.
>> 
>> On a side not, why would you call it KvState? And what would be called
>> KvState?
>> 
>>> On 14 Dec 2015, at 11:14, Kostas Tzoumas <ktzoumas@apache.org> wrote:
>>> 
>>> I suppose that they can start as sugar and evolve to a different
>>> implementation.
>>> 
>>> I would +1 the name change to KVState, OperatorState is indeed somewhat
>>> confusing, and it will only get harder to rename later.
>>> 
>>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gyula.fora@gmail.com>
>> wrote:
>>> 
>>>> Would the Reducing/Folding states just be some API sugar on top of what
>> we
>>>> have know (ValueState) or does it have some added functionality (like
>>>> incremental checkpoints for list states)?
>>>> 
>>>> Gyula
>>>> 
>>>> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. dec.
>> 14.,
>>>> H, 11:03):
>>>> 
>>>>> While enhancing the state interfaces we would also need to introduce
>> new
>>>>> types of state. I was thinking of these, for a start:
>>>>> - ValueState (works like OperatorState works now, i.e. provides methods
>>>>> to get/set one state value
>>>>> - ListState, proves methods to add one element to a list of elements
>> and
>>>>> to iterate over all contained elements
>>>>> - ReducingState, somewhat similar to value state but combines the added
>>>>> value to the existing value using a ReduceFunction
>>>>> - FoldingState, same as above but with fold
>>>>> 
>>>>> I think these are necessary to give the system more knowledge about the
>>>>> semantics of state so that it can handle the state more efficiently.
>>>> Think
>>>>> of incremental checkpoints, for example, these are easy to do if you
>> know
>>>>> that state is a list to which stuff is only appended.
>>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <sewen@apache.org> wrote:
>>>>>> 
>>>>>> A lot of this makes sense, but I am not sure about renaming
>>>>>> "OperatorState". The other name is nicer, but why make users' life
>> hard
>>>>>> just for a name?
>>>>>> 
>>>>>> 
>>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mxm@apache.org>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi Aljoscha,
>>>>>>> 
>>>>>>> Thanks for the informative technical description.
>>>>>>> 
>>>>>>>> - function state: this is the state that you get when a user
>> function
>>>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>>>> - operator state: This is the state that a StreamOperator
can
>>>> snapshot,
>>>>>>> it is similar to the function state, but for operators. it is
not
>>>>>>> partitioned
>>>>>>>> - partitioned state: state that is scoped to the key of the
incoming
>>>>>>> element, in Flink, this is (confusingly) called OperatorState
and
>>>>> KvState
>>>>>>> (internally)
>>>>>>> 
>>>>>>> Let's clean that up! Let's rename the OperatorState interface
to
>>>>> KvState.
>>>>>>> 
>>>>>>>> Both stream operators and user functions can have partitioned
state,
>>>>> and
>>>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>>>> state
>>>>>>> will stay indefinitely if not manually cleared.
>>>>>>> 
>>>>>>> I suppose operators currently have to take care to use a unique
>>>>>>> identifier for the state such that it doesn't clash with the
user
>>>>>>> function. Wouldn't be too hard to introduce a scoping here.
>>>>>>> 
>>>>>>> Your proposal makes sense. It seems like this is a rather delicate
>>>>>>> change which improves the flexibility of the streaming API. What
is
>>>>>>> the motivation behind this? I suppose you are thinking of
>> improvements
>>>>>>> to the session capabilities of the streaming API.
>>>>>>> 
>>>>>>>> If we want to also implement the current WindowOperator on
top of
>>>> these
>>>>>>> generic facilities we need to have a way to scope state not only
by
>>>> key
>>>>> but
>>>>>>> also by windows (or better, some generic state scope).
>>>>>>> 
>>>>>>> This is currently handled by the WindowOperator itself and would
then
>>>>>>> be delegated to the enhanced state interface? Makes sense if
we want
>>>>>>> to make use of the new state interface. Again, is it just cleaner
or
>>>>>>> does this enable new type of applications?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> 
>>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
>>>> aljoscha@apache.org>
>>>>>>> wrote:
>>>>>>>> Hi All,
>>>>>>>> I want to discuss some ideas about improving the
>>>> primitives/operations
>>>>>>> that Flink offers for user-state, timers and windows and how
these
>>>>> concepts
>>>>>>> can be unified.
>>>>>>>> 
>>>>>>>> It has come up a lot lately that people have very specific
>>>> requirements
>>>>>>> regarding the state that they keep and it seems necessary to
allows
>>>>> users
>>>>>>> to set their own custom timers (on processing time and watermark
time
>>>>>>> (event-time)) to do both expiration of state and implementation
of
>>>>> custom
>>>>>>> windowing semantics. While we’re at this, we might also think
about
>>>>>>> cleaning up the state handling a bit.
>>>>>>>> 
>>>>>>>> Let me first describe the status quo, so that we’re all
on the same
>>>>>>> page. There are three types of state:
>>>>>>>> - function state: this is the state that you get when a user
>> function
>>>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>>>> - operator state: This is the state that a StreamOperator
can
>>>> snapshot,
>>>>>>> it is similar to the function state, but for operators. it is
not
>>>>>>> partitioned
>>>>>>>> - partitioned state: state that is scoped to the key of the
incoming
>>>>>>> element, in Flink, this is (confusingly) called OperatorState
and
>>>>> KvState
>>>>>>> (internally)
>>>>>>>> 
>>>>>>>> (Operator is the low-level concept, user functions are usually
>>>> invoked
>>>>>>> by the operator, for example StreamMap is the operator that handles
a
>>>>>>> MapFunction.)
>>>>>>>> 
>>>>>>>> Function state and operator state is not partitioned, meaning
that
>> it
>>>>>>> becomes difficult when we want to implement dynamic
>>>> scale-in/scale-out.
>>>>>>> With partitioned state it can be redistributed when changing
the
>>>> degree
>>>>> of
>>>>>>> parallelism.
>>>>>>>> 
>>>>>>>> Both stream operators and user functions can have partitioned
state,
>>>>> and
>>>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>>>> state
>>>>>>> will stay indefinitely if not manually cleared.
>>>>>>>> 
>>>>>>>> On to timers, operators can register processing-time callbacks,
they
>>>>> can
>>>>>>> react to watermarks to implement event-time callbacks. They have
to
>>>>>>> implement the logic themselves, however. For example, the
>>>> WindowOperator
>>>>>>> has custom code to keep track of watermark timers and for reacting
to
>>>>>>> watermarks. User functions have no way of registering timers.
Also,
>>>>> timers
>>>>>>> are not scoped to any key. So if you register a timer while
>> processing
>>>>> an
>>>>>>> element of a certain key, when the timer fires you don’t know
what
>> key
>>>>> was
>>>>>>> active when registering the timer. This might be necessary for
>>>> cleaning
>>>>> up
>>>>>>> state for certain keys, or to trigger processing for a certain
key
>>>> only,
>>>>>>> for example with session windows of some kind.
>>>>>>>> 
>>>>>>>> Now, on to new stuff. I propose to add a timer facility that
can be
>>>>> used
>>>>>>> by both operators and user functions. Both partitioned state
and
>>>> timers
>>>>>>> should be aware of keys and if a timer fires the partitioned
state
>>>>> should
>>>>>>> be scoped to the same key that was active when the timer was
>>>> registered.
>>>>>>>> 
>>>>>>>> One last bit. If we want to also implement the current
>> WindowOperator
>>>>> on
>>>>>>> top of these generic facilities we need to have a way to scope
state
>>>> not
>>>>>>> only by key but also by windows (or better, some generic state
>> scope).
>>>>> The
>>>>>>> reason is, that one key can have several active windows at one
point
>>>> in
>>>>>>> time and firing timers need to me mapped to the correct window
(for
>>>>>>> example, for sliding windows, or session windows or what have
you…).
>>>>>>>> 
>>>>>>>> Happy discussing. :D
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 
>> 


Mime
View raw message