flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Improving State/Timers/Windows
Date Mon, 14 Dec 2015 10:02:58 GMT
While enhancing the state interfaces we would also need to introduce new types of state. I
was thinking of these, for a start:
 - ValueState (works like OperatorState works now, i.e. provides methods to get/set one state
value
 - ListState, proves methods to add one element to a list of elements and to iterate over
all contained elements
 - ReducingState, somewhat similar to value state but combines the added value to the existing
value using a ReduceFunction
 - FoldingState, same as above but with fold

I think these are necessary to give the system more knowledge about the semantics of state
so that it can handle the state more efficiently. Think of incremental checkpoints, for example,
these are easy to do if you know that state is a list to which stuff is only appended.
> On 14 Dec 2015, at 10:52, Stephan Ewen <sewen@apache.org> wrote:
> 
> A lot of this makes sense, but I am not sure about renaming
> "OperatorState". The other name is nicer, but why make users' life hard
> just for a name?
> 
> 
> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mxm@apache.org> wrote:
> 
>> Hi Aljoscha,
>> 
>> Thanks for the informative technical description.
>> 
>>> - function state: this is the state that you get when a user function
>> implements the Checkpointed interface. it is not partitioned
>>> - operator state: This is the state that a StreamOperator can snapshot,
>> it is similar to the function state, but for operators. it is not
>> partitioned
>>> - partitioned state: state that is scoped to the key of the incoming
>> element, in Flink, this is (confusingly) called OperatorState and KvState
>> (internally)
>> 
>> Let's clean that up! Let's rename the OperatorState interface to KvState.
>> 
>>> Both stream operators and user functions can have partitioned state, and
>> the namespace is the same, i.e. the state can clash. The partitioned state
>> will stay indefinitely if not manually cleared.
>> 
>> I suppose operators currently have to take care to use a unique
>> identifier for the state such that it doesn't clash with the user
>> function. Wouldn't be too hard to introduce a scoping here.
>> 
>> Your proposal makes sense. It seems like this is a rather delicate
>> change which improves the flexibility of the streaming API. What is
>> the motivation behind this? I suppose you are thinking of improvements
>> to the session capabilities of the streaming API.
>> 
>>> If we want to also implement the current WindowOperator on top of these
>> generic facilities we need to have a way to scope state not only by key but
>> also by windows (or better, some generic state scope).
>> 
>> This is currently handled by the WindowOperator itself and would then
>> be delegated to the enhanced state interface? Makes sense if we want
>> to make use of the new state interface. Again, is it just cleaner or
>> does this enable new type of applications?
>> 
>> Cheers,
>> Max
>> 
>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>> Hi All,
>>> I want to discuss some ideas about improving the primitives/operations
>> that Flink offers for user-state, timers and windows and how these concepts
>> can be unified.
>>> 
>>> It has come up a lot lately that people have very specific requirements
>> regarding the state that they keep and it seems necessary to allows users
>> to set their own custom timers (on processing time and watermark time
>> (event-time)) to do both expiration of state and implementation of custom
>> windowing semantics. While we’re at this, we might also think about
>> cleaning up the state handling a bit.
>>> 
>>> Let me first describe the status quo, so that we’re all on the same
>> page. There are three types of state:
>>> - function state: this is the state that you get when a user function
>> implements the Checkpointed interface. it is not partitioned
>>> - operator state: This is the state that a StreamOperator can snapshot,
>> it is similar to the function state, but for operators. it is not
>> partitioned
>>> - partitioned state: state that is scoped to the key of the incoming
>> element, in Flink, this is (confusingly) called OperatorState and KvState
>> (internally)
>>> 
>>> (Operator is the low-level concept, user functions are usually invoked
>> by the operator, for example StreamMap is the operator that handles a
>> MapFunction.)
>>> 
>>> Function state and operator state is not partitioned, meaning that it
>> becomes difficult when we want to implement dynamic scale-in/scale-out.
>> With partitioned state it can be redistributed when changing the degree of
>> parallelism.
>>> 
>>> Both stream operators and user functions can have partitioned state, and
>> the namespace is the same, i.e. the state can clash. The partitioned state
>> will stay indefinitely if not manually cleared.
>>> 
>>> On to timers, operators can register processing-time callbacks, they can
>> react to watermarks to implement event-time callbacks. They have to
>> implement the logic themselves, however. For example, the WindowOperator
>> has custom code to keep track of watermark timers and for reacting to
>> watermarks. User functions have no way of registering timers. Also, timers
>> are not scoped to any key. So if you register a timer while processing an
>> element of a certain key, when the timer fires you don’t know what key was
>> active when registering the timer. This might be necessary for cleaning up
>> state for certain keys, or to trigger processing for a certain key only,
>> for example with session windows of some kind.
>>> 
>>> Now, on to new stuff. I propose to add a timer facility that can be used
>> by both operators and user functions. Both partitioned state and timers
>> should be aware of keys and if a timer fires the partitioned state should
>> be scoped to the same key that was active when the timer was registered.
>>> 
>>> One last bit. If we want to also implement the current WindowOperator on
>> top of these generic facilities we need to have a way to scope state not
>> only by key but also by windows (or better, some generic state scope). The
>> reason is, that one key can have several active windows at one point in
>> time and firing timers need to me mapped to the correct window (for
>> example, for sliding windows, or session windows or what have you…).
>>> 
>>> Happy discussing. :D
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> 
>> 


Mime
View raw message