Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6E58B18907 for ; Mon, 14 Dec 2015 10:46:35 +0000 (UTC) Received: (qmail 19057 invoked by uid 500); 14 Dec 2015 10:46:35 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 18992 invoked by uid 500); 14 Dec 2015 10:46:35 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 18981 invoked by uid 99); 14 Dec 2015 10:46:35 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Dec 2015 10:46:35 +0000 Received: from mail-qk0-f175.google.com (mail-qk0-f175.google.com [209.85.220.175]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id CAD521A00C5 for ; Mon, 14 Dec 2015 10:46:34 +0000 (UTC) Received: by qkht125 with SMTP id t125so127881292qkh.3 for ; Mon, 14 Dec 2015 02:46:34 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.13.237.129 with SMTP id w123mr18526207ywe.319.1450089994136; Mon, 14 Dec 2015 02:46:34 -0800 (PST) Received: by 10.13.204.7 with HTTP; Mon, 14 Dec 2015 02:46:34 -0800 (PST) In-Reply-To: References: <379650FC-B88D-4040-AFBA-3F94A538C20A@apache.org> <48ED9BAD-0037-4BA8-8A31-8910ABCFCF56@apache.org> Date: Mon, 14 Dec 2015 11:46:34 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: [DISCUSS] Improving State/Timers/Windows From: Kostas Tzoumas To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=94eb2c08851efe467d0526d96252 --94eb2c08851efe467d0526d96252 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable oh, sorry, I misread. Just my +1 to renaming OperatorState then :-) On Mon, Dec 14, 2015 at 11:38 AM, Aljoscha Krettek wrote: > As I mentioned in my previous mail, I think that OperatorState would need > be replaced by more specific types of state (ValueState, ListState, =E2= =80=A6). > > > On 14 Dec 2015, at 11:34, Maximilian Michels wrote: > > > >> > >> On a side not, why would you call it KvState? And what would be called > >> KvState? > > > > > > The OperatorState interface would be called KvState. > > > > > > On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek > > wrote: > > > >> Yes, as Kostas said, it would initially nor provide more functionality > but > >> it would enable us to add it later. > >> > >> On a side not, why would you call it KvState? And what would be called > >> KvState? > >> > >>> On 14 Dec 2015, at 11:14, Kostas Tzoumas wrote: > >>> > >>> I suppose that they can start as sugar and evolve to a different > >>> implementation. > >>> > >>> I would +1 the name change to KVState, OperatorState is indeed somewh= at > >>> confusing, and it will only get harder to rename later. > >>> > >>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula F=C3=B3ra > >> wrote: > >>> > >>>> Would the Reducing/Folding states just be some API sugar on top of > what > >> we > >>>> have know (ValueState) or does it have some added functionality (lik= e > >>>> incremental checkpoints for list states)? > >>>> > >>>> Gyula > >>>> > >>>> Aljoscha Krettek ezt =C3=ADrta (id=C5=91pont: = 2015. dec. > >> 14., > >>>> H, 11:03): > >>>> > >>>>> While enhancing the state interfaces we would also need to introduc= e > >> new > >>>>> types of state. I was thinking of these, for a start: > >>>>> - ValueState (works like OperatorState works now, i.e. provides > methods > >>>>> to get/set one state value > >>>>> - ListState, proves methods to add one element to a list of element= s > >> and > >>>>> to iterate over all contained elements > >>>>> - ReducingState, somewhat similar to value state but combines the > added > >>>>> value to the existing value using a ReduceFunction > >>>>> - FoldingState, same as above but with fold > >>>>> > >>>>> I think these are necessary to give the system more knowledge about > the > >>>>> semantics of state so that it can handle the state more efficiently= . > >>>> Think > >>>>> of incremental checkpoints, for example, these are easy to do if yo= u > >> know > >>>>> that state is a list to which stuff is only appended. > >>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen wrote: > >>>>>> > >>>>>> A lot of this makes sense, but I am not sure about renaming > >>>>>> "OperatorState". The other name is nicer, but why make users' life > >> hard > >>>>>> just for a name? > >>>>>> > >>>>>> > >>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels < > mxm@apache.org> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Aljoscha, > >>>>>>> > >>>>>>> Thanks for the informative technical description. > >>>>>>> > >>>>>>>> - function state: this is the state that you get when a user > >> function > >>>>>>> implements the Checkpointed interface. it is not partitioned > >>>>>>>> - operator state: This is the state that a StreamOperator can > >>>> snapshot, > >>>>>>> it is similar to the function state, but for operators. it is not > >>>>>>> partitioned > >>>>>>>> - partitioned state: state that is scoped to the key of the > incoming > >>>>>>> element, in Flink, this is (confusingly) called OperatorState and > >>>>> KvState > >>>>>>> (internally) > >>>>>>> > >>>>>>> Let's clean that up! Let's rename the OperatorState interface to > >>>>> KvState. > >>>>>>> > >>>>>>>> Both stream operators and user functions can have partitioned > state, > >>>>> and > >>>>>>> the namespace is the same, i.e. the state can clash. The > partitioned > >>>>> state > >>>>>>> will stay indefinitely if not manually cleared. > >>>>>>> > >>>>>>> I suppose operators currently have to take care to use a unique > >>>>>>> identifier for the state such that it doesn't clash with the user > >>>>>>> function. Wouldn't be too hard to introduce a scoping here. > >>>>>>> > >>>>>>> Your proposal makes sense. It seems like this is a rather delicat= e > >>>>>>> change which improves the flexibility of the streaming API. What = is > >>>>>>> the motivation behind this? I suppose you are thinking of > >> improvements > >>>>>>> to the session capabilities of the streaming API. > >>>>>>> > >>>>>>>> If we want to also implement the current WindowOperator on top o= f > >>>> these > >>>>>>> generic facilities we need to have a way to scope state not only = by > >>>> key > >>>>> but > >>>>>>> also by windows (or better, some generic state scope). > >>>>>>> > >>>>>>> This is currently handled by the WindowOperator itself and would > then > >>>>>>> be delegated to the enhanced state interface? Makes sense if we > want > >>>>>>> to make use of the new state interface. Again, is it just cleaner > or > >>>>>>> does this enable new type of applications? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Max > >>>>>>> > >>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek < > >>>> aljoscha@apache.org> > >>>>>>> wrote: > >>>>>>>> Hi All, > >>>>>>>> I want to discuss some ideas about improving the > >>>> primitives/operations > >>>>>>> that Flink offers for user-state, timers and windows and how thes= e > >>>>> concepts > >>>>>>> can be unified. > >>>>>>>> > >>>>>>>> It has come up a lot lately that people have very specific > >>>> requirements > >>>>>>> regarding the state that they keep and it seems necessary to allo= ws > >>>>> users > >>>>>>> to set their own custom timers (on processing time and watermark > time > >>>>>>> (event-time)) to do both expiration of state and implementation o= f > >>>>> custom > >>>>>>> windowing semantics. While we=E2=80=99re at this, we might also t= hink about > >>>>>>> cleaning up the state handling a bit. > >>>>>>>> > >>>>>>>> Let me first describe the status quo, so that we=E2=80=99re all = on the > same > >>>>>>> page. There are three types of state: > >>>>>>>> - function state: this is the state that you get when a user > >> function > >>>>>>> implements the Checkpointed interface. it is not partitioned > >>>>>>>> - operator state: This is the state that a StreamOperator can > >>>> snapshot, > >>>>>>> it is similar to the function state, but for operators. it is not > >>>>>>> partitioned > >>>>>>>> - partitioned state: state that is scoped to the key of the > incoming > >>>>>>> element, in Flink, this is (confusingly) called OperatorState and > >>>>> KvState > >>>>>>> (internally) > >>>>>>>> > >>>>>>>> (Operator is the low-level concept, user functions are usually > >>>> invoked > >>>>>>> by the operator, for example StreamMap is the operator that > handles a > >>>>>>> MapFunction.) > >>>>>>>> > >>>>>>>> Function state and operator state is not partitioned, meaning th= at > >> it > >>>>>>> becomes difficult when we want to implement dynamic > >>>> scale-in/scale-out. > >>>>>>> With partitioned state it can be redistributed when changing the > >>>> degree > >>>>> of > >>>>>>> parallelism. > >>>>>>>> > >>>>>>>> Both stream operators and user functions can have partitioned > state, > >>>>> and > >>>>>>> the namespace is the same, i.e. the state can clash. The > partitioned > >>>>> state > >>>>>>> will stay indefinitely if not manually cleared. > >>>>>>>> > >>>>>>>> On to timers, operators can register processing-time callbacks, > they > >>>>> can > >>>>>>> react to watermarks to implement event-time callbacks. They have = to > >>>>>>> implement the logic themselves, however. For example, the > >>>> WindowOperator > >>>>>>> has custom code to keep track of watermark timers and for reactin= g > to > >>>>>>> watermarks. User functions have no way of registering timers. Als= o, > >>>>> timers > >>>>>>> are not scoped to any key. So if you register a timer while > >> processing > >>>>> an > >>>>>>> element of a certain key, when the timer fires you don=E2=80=99t = know what > >> key > >>>>> was > >>>>>>> active when registering the timer. This might be necessary for > >>>> cleaning > >>>>> up > >>>>>>> state for certain keys, or to trigger processing for a certain ke= y > >>>> only, > >>>>>>> for example with session windows of some kind. > >>>>>>>> > >>>>>>>> Now, on to new stuff. I propose to add a timer facility that can > be > >>>>> used > >>>>>>> by both operators and user functions. Both partitioned state and > >>>> timers > >>>>>>> should be aware of keys and if a timer fires the partitioned stat= e > >>>>> should > >>>>>>> be scoped to the same key that was active when the timer was > >>>> registered. > >>>>>>>> > >>>>>>>> One last bit. If we want to also implement the current > >> WindowOperator > >>>>> on > >>>>>>> top of these generic facilities we need to have a way to scope > state > >>>> not > >>>>>>> only by key but also by windows (or better, some generic state > >> scope). > >>>>> The > >>>>>>> reason is, that one key can have several active windows at one > point > >>>> in > >>>>>>> time and firing timers need to me mapped to the correct window (f= or > >>>>>>> example, for sliding windows, or session windows or what have > you=E2=80=A6). > >>>>>>>> > >>>>>>>> Happy discussing. :D > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Aljoscha > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >> > >> > > --94eb2c08851efe467d0526d96252--