flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State
Date Sun, 14 Aug 2016 04:54:03 GMT
Hi,
Yes I think it makes sense. :)

Gyula

On Fri, Aug 12, 2016, 17:02 Ufuk Celebi <uce@apache.org> wrote:

> I will update the design doc with more details for the Checkpointed
> variants and remove Option 2 (I think that's an orthogonal thing).
>
> The way I see it now, we should have base CheckpointedBase interface,
> have the current Checkpointed interface be a subclass for not
> repartitionable state. Then we have two other List-based variants:
>
> 1) Union List => on restore all state is unioned (what is currently in
> the design doc)
>
> 2) List => on restore state is automatically redistributed (if
> parallelism stays the same, state should go to the same sub tasks, but
> no guarantees when changed parallelism).
>
> ====
>
> Regarding the other thing you and Aljoscha discussed: I feel like that
> should be handled as part of the side input effort. Does that make
> sense?
>
>
>
> On Fri, Aug 12, 2016 at 3:11 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:
> > Hi Aljoscha,
> >
> > Yes this is pretty much how I think about it as well.
> >
> > Basically the state in this case would be computed from the side inputs
> > with the same state update logic on all operators. I think it is imprtant
> > that operators compute their own state or at least observe all state
> > changes otherwise a lot of things can get weird.
> >
> > Lets say for instance I am building a dynamic filter where new filter
> > conditions are added /removed on the fly. For the sake of my argument
> lets
> > also assume that initializing a new filter condition is a heavy
> operation.
> > The global state in this case is the union of all filter conditions.
> >
> > If at any point in time the operators could only observe the current
> state
> > we might end up with a very inefficient code, while if we observe all
> state
> > changes individually  (add 1 new filter) we can jus instantiate the new
> > filter without worrying about the other ones.
> >
> > I am not completely sure if its clear what I am trying to say :D
> >
> > Gyula
> >
> > On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >
> >> Hi Gyula,
> >> I was thinking about this as well, in the context of side-inputs, which
> >> would be a generalization of your use case. If I'm not mistaken. In my
> head
> >> I was calling it global state. Essentially, this state would be the
> same on
> >> all operators and when checkpointing you would only have to checkpoint
> the
> >> state of operator 0. Upon restore you would distribute this state to all
> >> operators again.
> >>
> >> Is this what you had in mind?
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra <gyula.fora@gmail.com> wrote:
> >>
> >> > Hi,
> >> > Let me try to explain what I mean by broadcast states.
> >> >
> >> > I think it is a very common pattern that people broadcast control
> >> messages
> >> > to operators that also receive normal input events.
> >> >
> >> > some examples: broadcast a model for prediction, broadcast some
> >> information
> >> > that should be the same at all subtasks but is evolving over time. At
> the
> >> > same time these operators usually also do normal event processing
> based
> >> on
> >> > the broadcasted input stream.
> >> >
> >> > There is currently no proper solution for this provided by the api. We
> >> can
> >> > of course use connected operators or wrapper types and broadcast one
> of
> >> the
> >> > input but there are several limitations. We cant use keyed states for
> >> > instance becase that requires both inputs to be keyed (so we cant
> >> > broadcast).
> >> >
> >> > Cheers,
> >> > Gyula
> >> >
> >> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi <uce@apache.org> wrote:
> >> >
> >> > > Comments inline.
> >> > >
> >> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra <gyula.fora@gmail.com>
> >> > wrote:
> >> > > > Option 1:
> >> > > > I think the main problem here is sending all the state everywhere
> >> will
> >> > > not
> >> > > > scale at all. I think this will even fail for some internal Flink
> >> > > operators
> >> > > > (window timers I think are kept like this, maybe Im wrong here).
> The
> >> > > > general problem here what we don't have with the key-value states
> is
> >> > that
> >> > > > the system can't do the repartitioning automatically. I think
we
> >> should
> >> > > try
> >> > > > to make abstractions that would allow the system to do this.
> >> > >
> >> > > The state size can definitely become a problem. For Kafka sources
> for
> >> > > example I don' think it would be problematic, but the timers it
> might
> >> > > be, yes. It definitely depends on the use case.
> >> > >
> >> > > In theory, we could also redistribute the list elements
> automatically,
> >> > > for example in a round robing fashion. The question is whether this
> >> > > will be enough in general.
> >> > >
> >> > > >
> >> > > > Option 2:
> >> > > > To be honest I don't completely get this approach, what do the
> >> indices
> >> > > mean
> >> > > > in the get set methods? What happens if the same index is used
> from
> >> > > > multiple operators?
> >> > > > This may also suffers in scalability like option 1 (but as I
said
> I
> >> > dont
> >> > > > get this completely :()
> >> > >
> >> > > Yes, I don't like it either. It's actually similar to Option 1 (from
> >> > > runtime perspective). I think the main question with Option 2 is
> >> > > whether we expose the API as an interface or a state class. If we
go
> >> > > for this kind of interface we could parameterize the restore
> behaviour
> >> > > via the descriptor (e.g. flag to merge/union etc.). That should be
> >> > > more extensible than providing interfaces.
> >> > >
> >> > > > I think another approach could be (might be similar what option
2
> is
> >> > > trying
> >> > > > to achieve) to provide a Set<T>  (or Map) like  abstraction
to
> keep
> >> the
> >> > > non
> >> > > > partitioned states. Users could add/remove things from it at
> their on
> >> > > will,
> >> > > > but the system would be free to redistribute the Sets between
the
> >> > > > operators. In practice this would mean for instance that the
Kafka
> >> > > sources
> >> > > > would store (partition, offset) tuples in the set but and every
> time
> >> in
> >> > > the
> >> > > > open method they would check what is assigned to them (the system
> is
> >> > free
> >> > > > to decide). This of course would only work well if we can assume
> that
> >> > > > distributing the states by equal numbers is desirable.
> >> > >
> >> > > I think the same point applies to redistributing the list
> >> > > automatically (what I meant with whether it is "general enough").
I
> >> > > think what you describe here could be the list w/o unioning it.
> >> > >
> >> > > >
> >> > > > Broadcast states:
> >> > > > This might be a good time to think about broadcast states.
> >> > > Non-partitioned
> >> > > > states that are the same at all subtasks, I think this comes
up
> in a
> >> > lot
> >> > > of
> >> > > > use-cases (I know at least one myself haha) and it is pretty
> straight
> >> > > > forward from a runtime perspective, the bigger question is the
> API.
> >> > >
> >> > > Can you explain this a little more?
> >> > >
> >> > > ========
> >> > >
> >> > > Another open question (not addressed in the FLIP yet) is how we
> treat
> >> > > operators that have both keyed and non-keyed state. The current API
> >> > > kind of moves this question to the user.
> >> > >
> >> >
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message