flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State
Date Fri, 12 Aug 2016 12:27:52 GMT
Hi Gyula,
I was thinking about this as well, in the context of side-inputs, which
would be a generalization of your use case. If I'm not mistaken. In my head
I was calling it global state. Essentially, this state would be the same on
all operators and when checkpointing you would only have to checkpoint the
state of operator 0. Upon restore you would distribute this state to all
operators again.

Is this what you had in mind?

Cheers,
Aljoscha

On Fri, 12 Aug 2016 at 13:07 Gyula Fóra <gyula.fora@gmail.com> wrote:

> Hi,
> Let me try to explain what I mean by broadcast states.
>
> I think it is a very common pattern that people broadcast control messages
> to operators that also receive normal input events.
>
> some examples: broadcast a model for prediction, broadcast some information
> that should be the same at all subtasks but is evolving over time. At the
> same time these operators usually also do normal event processing based on
> the broadcasted input stream.
>
> There is currently no proper solution for this provided by the api. We can
> of course use connected operators or wrapper types and broadcast one of the
> input but there are several limitations. We cant use keyed states for
> instance becase that requires both inputs to be keyed (so we cant
> broadcast).
>
> Cheers,
> Gyula
>
> On Fri, Aug 12, 2016, 12:28 Ufuk Celebi <uce@apache.org> wrote:
>
> > Comments inline.
> >
> > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> > > Option 1:
> > > I think the main problem here is sending all the state everywhere will
> > not
> > > scale at all. I think this will even fail for some internal Flink
> > operators
> > > (window timers I think are kept like this, maybe Im wrong here). The
> > > general problem here what we don't have with the key-value states is
> that
> > > the system can't do the repartitioning automatically. I think we should
> > try
> > > to make abstractions that would allow the system to do this.
> >
> > The state size can definitely become a problem. For Kafka sources for
> > example I don' think it would be problematic, but the timers it might
> > be, yes. It definitely depends on the use case.
> >
> > In theory, we could also redistribute the list elements automatically,
> > for example in a round robing fashion. The question is whether this
> > will be enough in general.
> >
> > >
> > > Option 2:
> > > To be honest I don't completely get this approach, what do the indices
> > mean
> > > in the get set methods? What happens if the same index is used from
> > > multiple operators?
> > > This may also suffers in scalability like option 1 (but as I said I
> dont
> > > get this completely :()
> >
> > Yes, I don't like it either. It's actually similar to Option 1 (from
> > runtime perspective). I think the main question with Option 2 is
> > whether we expose the API as an interface or a state class. If we go
> > for this kind of interface we could parameterize the restore behaviour
> > via the descriptor (e.g. flag to merge/union etc.). That should be
> > more extensible than providing interfaces.
> >
> > > I think another approach could be (might be similar what option 2 is
> > trying
> > > to achieve) to provide a Set<T>  (or Map) like  abstraction to keep the
> > non
> > > partitioned states. Users could add/remove things from it at their on
> > will,
> > > but the system would be free to redistribute the Sets between the
> > > operators. In practice this would mean for instance that the Kafka
> > sources
> > > would store (partition, offset) tuples in the set but and every time in
> > the
> > > open method they would check what is assigned to them (the system is
> free
> > > to decide). This of course would only work well if we can assume that
> > > distributing the states by equal numbers is desirable.
> >
> > I think the same point applies to redistributing the list
> > automatically (what I meant with whether it is "general enough"). I
> > think what you describe here could be the list w/o unioning it.
> >
> > >
> > > Broadcast states:
> > > This might be a good time to think about broadcast states.
> > Non-partitioned
> > > states that are the same at all subtasks, I think this comes up in a
> lot
> > of
> > > use-cases (I know at least one myself haha) and it is pretty straight
> > > forward from a runtime perspective, the bigger question is the API.
> >
> > Can you explain this a little more?
> >
> > ========
> >
> > Another open question (not addressed in the FLIP yet) is how we treat
> > operators that have both keyed and non-keyed state. The current API
> > kind of moves this question to the user.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message