flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: [DISCUSS] Re-add record copy to chained operator calls
Date Wed, 20 May 2015 14:18:20 GMT
I think it is fair to say that everything that Flink has in its core
provides immutability. The mutability effect comes only if the user starts
mutating objects across functions.

The overhead will depend vastly on whether you are sending smaller records
or large records.

I see you are very keen on the failsafe variant. That is fine, I'd say
let's go ahead.

Then let us introduce a switch. The switch needs to work on copies for user
functions only. Until the window buffers are serialized, we need to keep
the copies there.



On Wed, May 20, 2015 at 3:55 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:

> I know it is nicer to have no-copy from a performance perspective, but a
> dataflow system with no immutability guarantee is something very hard to
> describe.
>
> Systems like Storm and Google Dataflow have immutablility guarantees I
> think for the same reason to provide very clear, easy to use semantics.
>
> On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <sewen@apache.org> wrote:
>
> > A vote is the last resort. Consensus through discussion is much nicer.
> And
> > I think we are making progress.
> >
> > We went for the lightweight version in the batch API, because
> >  - there are few cases that are affected (only functions with side effect
> > state)
> >  - you can always switch lightweight -> failsafe in the future (only
> > hardens guarantees), but not the other way around (dropping guarantees)
> > without breaking existing code.
> >
> > If you are strong on that point, I do not want to be a blocker for this.
> I
> > only ask to make a well informed decision, as this behavior is as much
> part
> > of the API as the classname of the DataStream.
> >
> >
> > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> >
> > > I would go for the Failsafe option as a default behaviour with a
> clearly
> > > documented lightweight (no-copy) setting, but I think having a Vote on
> > this
> > > would be the proper way of settling this question.
> > >
> > > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > I think that in the long run (maybe not too long) we will have to
> > > > change our stateful operators (windows, basically) to use managed
> > > > memory and spill to disk. (Think jobs that have sliding windows over
> > > > days or weeks) Then then the internal operators will take care of
> > > > copying anyways. The problem Gyula mentioned we cannot tackle other
> > > > than by defining how user code must behave.
> > > >
> > > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <sewen@apache.org>
> > wrote:
> > > > > It does not mean we have to behave the same way, it is just an
> > > indication
> > > > > that well-defined behavior can allow you to mess things up.
> > > > >
> > > > > The question is now what is the default mode:
> > > > >  - Failsafe/Heavy (always copy)
> > > > >  - Performance/Lightweight (do not copy)
> > > > >
> > > > >
> > > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <sewen@apache.org>
> > > wrote:
> > > > >
> > > > >> This is something that we can clearly define as "should not be
> > done".
> > > > >> Systems do that.
> > > > >> I think if you repeatedly emit (or mutate) the same object for
> > example
> > > > in
> > > > >> Spark, you get an RDD with completely messed up contents.
> > > > >>
> > > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyfora@apache.org>
> > > wrote:
> > > > >>
> > > > >>> If the preceding operator is emitting a mutated object, or
does
> > > > something
> > > > >>> with the output object afterwards then its a problem.
> > > > >>>
> > > > >>> Emitting the same object is a special case of this.
> > > > >>>
> > > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <sewen@apache.org>
> > > > wrote:
> > > > >>>
> > > > >>> > The case you are making is if a preceding operator in
a chain
> is
> > > > >>> repeatedly
> > > > >>> > emitting the same object, and the succeeding operator
is
> > gathering
> > > > the
> > > > >>> > objects, then it is a problem
> > > > >>> >
> > > > >>> > Or are there cases where the system itself repeatedly
emits the
> > > same
> > > > >>> > objects?
> > > > >>> >
> > > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyfora@apache.org
> >
> > > > wrote:
> > > > >>> >
> > > > >>> > > We are designing a system for stateful stream computations,
> > > > assuming
> > > > >>> long
> > > > >>> > > standing operators that gather and store data as
the stream
> > > evolves
> > > > >>> > (unlike
> > > > >>> > > in the dataset api). Many programs, like windowing,
sampling
> > etc
> > > > hold
> > > > >>> the
> > > > >>> > > state in the form of past data. And without careful
> > understanding
> > > > of
> > > > >>> the
> > > > >>> > > runtime these programs will break or have unnecessary
copies.
> > > > >>> > >
> > > > >>> > > This is why I think immutability should be the
default so we
> > can
> > > > have
> > > > >>> a
> > > > >>> > > clear dataflow model with immutable streams.
> > > > >>> > >
> > > > >>> > > I see absolutely no reason why we cant have the
non-copy
> > version
> > > > as an
> > > > >>> > > optional setting for the users.
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone
<
> parisc@kth.se>
> > > > wrote:
> > > > >>> > >
> > > > >>> > > > @stephan I see your point. If we assume that
operators do
> not
> > > > hold
> > > > >>> > > > references in their state to any transmitted
records it
> works
> > > > fine.
> > > > >>> We
> > > > >>> > > > therefore need to make this clear to the users.
I need to
> > check
> > > > if
> > > > >>> that
> > > > >>> > > > would break semantics in SAMOA or other integrations
as
> well
> > > that
> > > > >>> > assume
> > > > >>> > > > immutability. For example in SAMOA there are
often local
> > metric
> > > > >>> objects
> > > > >>> > > > that are being constantly mutated and simply
forwarded
> > > > periodically
> > > > >>> to
> > > > >>> > > > other (possibly chained) operators that need
to evaluate
> > them.
> > > > >>> > > >
> > > > >>> > > > ________________________________________
> > > > >>> > > > From: Gyula Fóra <gyfora@apache.org>
> > > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > > >>> > > > To: dev@flink.apache.org
> > > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy
to chained
> operator
> > > > calls
> > > > >>> > > >
> > > > >>> > > > "Copy before putting it into a window buffer
and any other
> > > group
> > > > >>> > buffer."
> > > > >>> > > >
> > > > >>> > > > Exactly my point. Any stateful operator should
be able to
> > > > implement
> > > > >>> > > > something like this without having to worry
about copying
> the
> > > > object
> > > > >>> > (and
> > > > >>> > > > at this point the user would need to know
whether it comes
> > from
> > > > the
> > > > >>> > > network
> > > > >>> > > > to avoid unnecessary copies), so I don't agree
with leaving
> > the
> > > > copy
> > > > >>> > off.
> > > > >>> > > >
> > > > >>> > > > The user can of course specify that the operator
is mutable
> > if
> > > he
> > > > >>> wants
> > > > >>> > > > (and he is worried about the performance),
But I still
> think
> > > the
> > > > >>> > default
> > > > >>> > > > behaviour should be immutable.
> > > > >>> > > > We cannot force users to not hold object references
and
> also
> > it
> > > > is a
> > > > >>> > > quite
> > > > >>> > > > unnatural way of programming in a language
like java.
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen
<
> > > sewen@apache.org>
> > > > >>> > wrote:
> > > > >>> > > >
> > > > >>> > > > > I am curious why the copying is actually
needed.
> > > > >>> > > > >
> > > > >>> > > > > In the batch API, we chain and do not
copy and it is
> rather
> > > > >>> > > predictable.
> > > > >>> > > > >
> > > > >>> > > > > The cornerpoints of that design is to
follow these rules:
> > > > >>> > > > >
> > > > >>> > > > >  1) Objects read from the network or
any buffer are
> always
> > > new
> > > > >>> > objects.
> > > > >>> > > > > That comes naturally when they are deserialized
as part
> of
> > > that
> > > > >>> (all
> > > > >>> > > > > buffers store serialized)
> > > > >>> > > > >
> > > > >>> > > > >  2) After a function returned a record
(or gives one to
> the
> > > > >>> > collector),
> > > > >>> > > > it
> > > > >>> > > > > if given to the chain of chained operators,
but after it
> is
> > > > >>> through
> > > > >>> > the
> > > > >>> > > > > chain, no one else holds a reference
to that object.
> > > > >>> > > > >      For that, it is crucial that objects
are not stored
> by
> > > > >>> > reference,
> > > > >>> > > > but
> > > > >>> > > > > either stored serialized, or a copy is
stored.
> > > > >>> > > > >
> > > > >>> > > > > This is quite solid in the batch API.
How about we follow
> > the
> > > > same
> > > > >>> > > > paradigm
> > > > >>> > > > > in the streaming API. We would need to
adjust the
> > following:
> > > > >>> > > > >
> > > > >>> > > > > 1) Do not copy between operators (I think
this is the
> case
> > > > right
> > > > >>> now)
> > > > >>> > > > >
> > > > >>> > > > > 2) Copy before putting it into a window
buffer and any
> > other
> > > > group
> > > > >>> > > > buffer.
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha
Krettek <
> > > > >>> > aljoscha@apache.org
> > > > >>> > > >
> > > > >>> > > > > wrote:
> > > > >>> > > > >
> > > > >>> > > > > > Yes, in fact I anticipated this.
There is one central
> > place
> > > > >>> where
> > > > >>> > we
> > > > >>> > > > > > can insert a copy step, in OperatorCollector
in
> > > > OutputHandler.
> > > > >>> > > > > >
> > > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM,
Paris Carbone <
> > > > parisc@kth.se>
> > > > >>> > > wrote:
> > > > >>> > > > > > > I guess it was not intended
^^.
> > > > >>> > > > > > >
> > > > >>> > > > > > > Chaining should be transparent
and not break the
> > > > >>> correct/expected
> > > > >>> > > > > > behaviour.
> > > > >>> > > > > > >
> > > > >>> > > > > > >
> > > > >>> > > > > > > Paris?
> > > > >>> > > > > > >
> > > > >>> > > > > > > On 20 May 2015, at 11:02, Márton
Balassi <
> > > > mbalassi@apache.org
> > > > >>> >
> > > > >>> > > > wrote:
> > > > >>> > > > > > >
> > > > >>> > > > > > > +1 for copying.
> > > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula
Fóra" <
> > > gyfora@apache.org>
> > > > >>> > wrote:
> > > > >>> > > > > > >
> > > > >>> > > > > > > Hey,
> > > > >>> > > > > > >
> > > > >>> > > > > > > The latest streaming operator
rework removed the
> > copying
> > > of
> > > > >>> the
> > > > >>> > > > outputs
> > > > >>> > > > > > > before passing them to chained
operators. This is a
> > major
> > > > >>> break
> > > > >>> > for
> > > > >>> > > > the
> > > > >>> > > > > > > previous operator semantics
which guaranteed
> > > immutability.
> > > > >>> > > > > > >
> > > > >>> > > > > > > I think this change leads to
very indeterministic
> > program
> > > > >>> > behaviour
> > > > >>> > > > > from
> > > > >>> > > > > > > the user's perspective as only
non-chained
> > outputs/inputs
> > > > >>> will be
> > > > >>> > > > > > mutable.
> > > > >>> > > > > > > If we allow this to happen,
users will start
> disabling
> > > > >>> chaining
> > > > >>> > to
> > > > >>> > > > get
> > > > >>> > > > > > > immutability which defeats
the purpose. (chaining
> > should
> > > > not
> > > > >>> > affect
> > > > >>> > > > > > program
> > > > >>> > > > > > > behaviour just increase performance)
> > > > >>> > > > > > >
> > > > >>> > > > > > > In my opinion the default setting
for each operator
> > > should
> > > > be
> > > > >>> > > > > > immutability
> > > > >>> > > > > > > and the user could override
this manually if he/she
> > > wants.
> > > > >>> > > > > > >
> > > > >>> > > > > > > What do you think?
> > > > >>> > > > > > >
> > > > >>> > > > > > > Regards,
> > > > >>> > > > > > > Gyula
> > > > >>> > > > > > >
> > > > >>> > > > > > >
> > > > >>> > > > > >
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message