flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Re-add record copy to chained operator calls
Date Wed, 20 May 2015 13:52:21 GMT
We should maybe run some benchmarks and see what the overhead of
always running a copy between chained operators actually is.

On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <sewen@apache.org> wrote:
> A vote is the last resort. Consensus through discussion is much nicer. And
> I think we are making progress.
>
> We went for the lightweight version in the batch API, because
>  - there are few cases that are affected (only functions with side effect
> state)
>  - you can always switch lightweight -> failsafe in the future (only
> hardens guarantees), but not the other way around (dropping guarantees)
> without breaking existing code.
>
> If you are strong on that point, I do not want to be a blocker for this. I
> only ask to make a well informed decision, as this behavior is as much part
> of the API as the classname of the DataStream.
>
>
> On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:
>
>> I would go for the Failsafe option as a default behaviour with a clearly
>> documented lightweight (no-copy) setting, but I think having a Vote on this
>> would be the proper way of settling this question.
>>
>> On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>> > I think that in the long run (maybe not too long) we will have to
>> > change our stateful operators (windows, basically) to use managed
>> > memory and spill to disk. (Think jobs that have sliding windows over
>> > days or weeks) Then then the internal operators will take care of
>> > copying anyways. The problem Gyula mentioned we cannot tackle other
>> > than by defining how user code must behave.
>> >
>> > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <sewen@apache.org> wrote:
>> > > It does not mean we have to behave the same way, it is just an
>> indication
>> > > that well-defined behavior can allow you to mess things up.
>> > >
>> > > The question is now what is the default mode:
>> > >  - Failsafe/Heavy (always copy)
>> > >  - Performance/Lightweight (do not copy)
>> > >
>> > >
>> > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <sewen@apache.org>
>> wrote:
>> > >
>> > >> This is something that we can clearly define as "should not be done".
>> > >> Systems do that.
>> > >> I think if you repeatedly emit (or mutate) the same object for example
>> > in
>> > >> Spark, you get an RDD with completely messed up contents.
>> > >>
>> > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyfora@apache.org>
>> wrote:
>> > >>
>> > >>> If the preceding operator is emitting a mutated object, or does
>> > something
>> > >>> with the output object afterwards then its a problem.
>> > >>>
>> > >>> Emitting the same object is a special case of this.
>> > >>>
>> > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <sewen@apache.org>
>> > wrote:
>> > >>>
>> > >>> > The case you are making is if a preceding operator in a chain
is
>> > >>> repeatedly
>> > >>> > emitting the same object, and the succeeding operator is gathering
>> > the
>> > >>> > objects, then it is a problem
>> > >>> >
>> > >>> > Or are there cases where the system itself repeatedly emits
the
>> same
>> > >>> > objects?
>> > >>> >
>> > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyfora@apache.org>
>> > wrote:
>> > >>> >
>> > >>> > > We are designing a system for stateful stream computations,
>> > assuming
>> > >>> long
>> > >>> > > standing operators that gather and store data as the
stream
>> evolves
>> > >>> > (unlike
>> > >>> > > in the dataset api). Many programs, like windowing, sampling
etc
>> > hold
>> > >>> the
>> > >>> > > state in the form of past data. And without careful understanding
>> > of
>> > >>> the
>> > >>> > > runtime these programs will break or have unnecessary
copies.
>> > >>> > >
>> > >>> > > This is why I think immutability should be the default
so we can
>> > have
>> > >>> a
>> > >>> > > clear dataflow model with immutable streams.
>> > >>> > >
>> > >>> > > I see absolutely no reason why we cant have the non-copy
version
>> > as an
>> > >>> > > optional setting for the users.
>> > >>> > >
>> > >>> > >
>> > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <parisc@kth.se>
>> > wrote:
>> > >>> > >
>> > >>> > > > @stephan I see your point. If we assume that operators
do not
>> > hold
>> > >>> > > > references in their state to any transmitted records
it works
>> > fine.
>> > >>> We
>> > >>> > > > therefore need to make this clear to the users.
I need to check
>> > if
>> > >>> that
>> > >>> > > > would break semantics in SAMOA or other integrations
as well
>> that
>> > >>> > assume
>> > >>> > > > immutability. For example in SAMOA there are often
local metric
>> > >>> objects
>> > >>> > > > that are being constantly mutated and simply forwarded
>> > periodically
>> > >>> to
>> > >>> > > > other (possibly chained) operators that need to
evaluate them.
>> > >>> > > >
>> > >>> > > > ________________________________________
>> > >>> > > > From: Gyula Fóra <gyfora@apache.org>
>> > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
>> > >>> > > > To: dev@flink.apache.org
>> > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained
operator
>> > calls
>> > >>> > > >
>> > >>> > > > "Copy before putting it into a window buffer and
any other
>> group
>> > >>> > buffer."
>> > >>> > > >
>> > >>> > > > Exactly my point. Any stateful operator should be
able to
>> > implement
>> > >>> > > > something like this without having to worry about
copying the
>> > object
>> > >>> > (and
>> > >>> > > > at this point the user would need to know whether
it comes from
>> > the
>> > >>> > > network
>> > >>> > > > to avoid unnecessary copies), so I don't agree with
leaving the
>> > copy
>> > >>> > off.
>> > >>> > > >
>> > >>> > > > The user can of course specify that the operator
is mutable if
>> he
>> > >>> wants
>> > >>> > > > (and he is worried about the performance), But I
still think
>> the
>> > >>> > default
>> > >>> > > > behaviour should be immutable.
>> > >>> > > > We cannot force users to not hold object references
and also it
>> > is a
>> > >>> > > quite
>> > >>> > > > unnatural way of programming in a language like
java.
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
>> sewen@apache.org>
>> > >>> > wrote:
>> > >>> > > >
>> > >>> > > > > I am curious why the copying is actually needed.
>> > >>> > > > >
>> > >>> > > > > In the batch API, we chain and do not copy
and it is rather
>> > >>> > > predictable.
>> > >>> > > > >
>> > >>> > > > > The cornerpoints of that design is to follow
these rules:
>> > >>> > > > >
>> > >>> > > > >  1) Objects read from the network or any buffer
are always
>> new
>> > >>> > objects.
>> > >>> > > > > That comes naturally when they are deserialized
as part of
>> that
>> > >>> (all
>> > >>> > > > > buffers store serialized)
>> > >>> > > > >
>> > >>> > > > >  2) After a function returned a record (or
gives one to the
>> > >>> > collector),
>> > >>> > > > it
>> > >>> > > > > if given to the chain of chained operators,
but after it is
>> > >>> through
>> > >>> > the
>> > >>> > > > > chain, no one else holds a reference to that
object.
>> > >>> > > > >      For that, it is crucial that objects are
not stored by
>> > >>> > reference,
>> > >>> > > > but
>> > >>> > > > > either stored serialized, or a copy is stored.
>> > >>> > > > >
>> > >>> > > > > This is quite solid in the batch API. How about
we follow the
>> > same
>> > >>> > > > paradigm
>> > >>> > > > > in the streaming API. We would need to adjust
the following:
>> > >>> > > > >
>> > >>> > > > > 1) Do not copy between operators (I think this
is the case
>> > right
>> > >>> now)
>> > >>> > > > >
>> > >>> > > > > 2) Copy before putting it into a window buffer
and any other
>> > group
>> > >>> > > > buffer.
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek
<
>> > >>> > aljoscha@apache.org
>> > >>> > > >
>> > >>> > > > > wrote:
>> > >>> > > > >
>> > >>> > > > > > Yes, in fact I anticipated this. There
is one central place
>> > >>> where
>> > >>> > we
>> > >>> > > > > > can insert a copy step, in OperatorCollector
in
>> > OutputHandler.
>> > >>> > > > > >
>> > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris
Carbone <
>> > parisc@kth.se>
>> > >>> > > wrote:
>> > >>> > > > > > > I guess it was not intended ^^.
>> > >>> > > > > > >
>> > >>> > > > > > > Chaining should be transparent and
not break the
>> > >>> correct/expected
>> > >>> > > > > > behaviour.
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > > > Paris?
>> > >>> > > > > > >
>> > >>> > > > > > > On 20 May 2015, at 11:02, Márton
Balassi <
>> > mbalassi@apache.org
>> > >>> >
>> > >>> > > > wrote:
>> > >>> > > > > > >
>> > >>> > > > > > > +1 for copying.
>> > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula
Fóra" <
>> gyfora@apache.org>
>> > >>> > wrote:
>> > >>> > > > > > >
>> > >>> > > > > > > Hey,
>> > >>> > > > > > >
>> > >>> > > > > > > The latest streaming operator rework
removed the copying
>> of
>> > >>> the
>> > >>> > > > outputs
>> > >>> > > > > > > before passing them to chained operators.
This is a major
>> > >>> break
>> > >>> > for
>> > >>> > > > the
>> > >>> > > > > > > previous operator semantics which
guaranteed
>> immutability.
>> > >>> > > > > > >
>> > >>> > > > > > > I think this change leads to very
indeterministic program
>> > >>> > behaviour
>> > >>> > > > > from
>> > >>> > > > > > > the user's perspective as only non-chained
outputs/inputs
>> > >>> will be
>> > >>> > > > > > mutable.
>> > >>> > > > > > > If we allow this to happen, users
will start disabling
>> > >>> chaining
>> > >>> > to
>> > >>> > > > get
>> > >>> > > > > > > immutability which defeats the purpose.
(chaining should
>> > not
>> > >>> > affect
>> > >>> > > > > > program
>> > >>> > > > > > > behaviour just increase performance)
>> > >>> > > > > > >
>> > >>> > > > > > > In my opinion the default setting
for each operator
>> should
>> > be
>> > >>> > > > > > immutability
>> > >>> > > > > > > and the user could override this
manually if he/she
>> wants.
>> > >>> > > > > > >
>> > >>> > > > > > > What do you think?
>> > >>> > > > > > >
>> > >>> > > > > > > Regards,
>> > >>> > > > > > > Gyula
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> > >>
>> >
>>

Mime
View raw message