flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: [DISCUSS] Re-add record copy to chained operator calls
Date Wed, 20 May 2015 13:30:58 GMT
It does not mean we have to behave the same way, it is just an indication
that well-defined behavior can allow you to mess things up.

The question is now what is the default mode:
 - Failsafe/Heavy (always copy)
 - Performance/Lightweight (do not copy)


On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <sewen@apache.org> wrote:

> This is something that we can clearly define as "should not be done".
> Systems do that.
> I think if you repeatedly emit (or mutate) the same object for example in
> Spark, you get an RDD with completely messed up contents.
>
> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyfora@apache.org> wrote:
>
>> If the preceding operator is emitting a mutated object, or does something
>> with the output object afterwards then its a problem.
>>
>> Emitting the same object is a special case of this.
>>
>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> > The case you are making is if a preceding operator in a chain is
>> repeatedly
>> > emitting the same object, and the succeeding operator is gathering the
>> > objects, then it is a problem
>> >
>> > Or are there cases where the system itself repeatedly emits the same
>> > objects?
>> >
>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyfora@apache.org> wrote:
>> >
>> > > We are designing a system for stateful stream computations, assuming
>> long
>> > > standing operators that gather and store data as the stream evolves
>> > (unlike
>> > > in the dataset api). Many programs, like windowing, sampling etc hold
>> the
>> > > state in the form of past data. And without careful understanding of
>> the
>> > > runtime these programs will break or have unnecessary copies.
>> > >
>> > > This is why I think immutability should be the default so we can have
>> a
>> > > clear dataflow model with immutable streams.
>> > >
>> > > I see absolutely no reason why we cant have the non-copy version as an
>> > > optional setting for the users.
>> > >
>> > >
>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <parisc@kth.se> wrote:
>> > >
>> > > > @stephan I see your point. If we assume that operators do not hold
>> > > > references in their state to any transmitted records it works fine.
>> We
>> > > > therefore need to make this clear to the users. I need to check if
>> that
>> > > > would break semantics in SAMOA or other integrations as well that
>> > assume
>> > > > immutability. For example in SAMOA there are often local metric
>> objects
>> > > > that are being constantly mutated and simply forwarded periodically
>> to
>> > > > other (possibly chained) operators that need to evaluate them.
>> > > >
>> > > > ________________________________________
>> > > > From: Gyula Fóra <gyfora@apache.org>
>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
>> > > > To: dev@flink.apache.org
>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
>> > > >
>> > > > "Copy before putting it into a window buffer and any other group
>> > buffer."
>> > > >
>> > > > Exactly my point. Any stateful operator should be able to implement
>> > > > something like this without having to worry about copying the object
>> > (and
>> > > > at this point the user would need to know whether it comes from the
>> > > network
>> > > > to avoid unnecessary copies), so I don't agree with leaving the copy
>> > off.
>> > > >
>> > > > The user can of course specify that the operator is mutable if he
>> wants
>> > > > (and he is worried about the performance), But I still think the
>> > default
>> > > > behaviour should be immutable.
>> > > > We cannot force users to not hold object references and also it is
a
>> > > quite
>> > > > unnatural way of programming in a language like java.
>> > > >
>> > > >
>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <sewen@apache.org>
>> > wrote:
>> > > >
>> > > > > I am curious why the copying is actually needed.
>> > > > >
>> > > > > In the batch API, we chain and do not copy and it is rather
>> > > predictable.
>> > > > >
>> > > > > The cornerpoints of that design is to follow these rules:
>> > > > >
>> > > > >  1) Objects read from the network or any buffer are always new
>> > objects.
>> > > > > That comes naturally when they are deserialized as part of that
>> (all
>> > > > > buffers store serialized)
>> > > > >
>> > > > >  2) After a function returned a record (or gives one to the
>> > collector),
>> > > > it
>> > > > > if given to the chain of chained operators, but after it is
>> through
>> > the
>> > > > > chain, no one else holds a reference to that object.
>> > > > >      For that, it is crucial that objects are not stored by
>> > reference,
>> > > > but
>> > > > > either stored serialized, or a copy is stored.
>> > > > >
>> > > > > This is quite solid in the batch API. How about we follow the
same
>> > > > paradigm
>> > > > > in the streaming API. We would need to adjust the following:
>> > > > >
>> > > > > 1) Do not copy between operators (I think this is the case right
>> now)
>> > > > >
>> > > > > 2) Copy before putting it into a window buffer and any other
group
>> > > > buffer.
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
>> > aljoscha@apache.org
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Yes, in fact I anticipated this. There is one central place
>> where
>> > we
>> > > > > > can insert a copy step, in OperatorCollector in OutputHandler.
>> > > > > >
>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <parisc@kth.se>
>> > > wrote:
>> > > > > > > I guess it was not intended ^^.
>> > > > > > >
>> > > > > > > Chaining should be transparent and not break the
>> correct/expected
>> > > > > > behaviour.
>> > > > > > >
>> > > > > > >
>> > > > > > > Paris?
>> > > > > > >
>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <mbalassi@apache.org
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > +1 for copying.
>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gyfora@apache.org>
>> > wrote:
>> > > > > > >
>> > > > > > > Hey,
>> > > > > > >
>> > > > > > > The latest streaming operator rework removed the copying
of
>> the
>> > > > outputs
>> > > > > > > before passing them to chained operators. This is a
major
>> break
>> > for
>> > > > the
>> > > > > > > previous operator semantics which guaranteed immutability.
>> > > > > > >
>> > > > > > > I think this change leads to very indeterministic program
>> > behaviour
>> > > > > from
>> > > > > > > the user's perspective as only non-chained outputs/inputs
>> will be
>> > > > > > mutable.
>> > > > > > > If we allow this to happen, users will start disabling
>> chaining
>> > to
>> > > > get
>> > > > > > > immutability which defeats the purpose. (chaining should
not
>> > affect
>> > > > > > program
>> > > > > > > behaviour just increase performance)
>> > > > > > >
>> > > > > > > In my opinion the default setting for each operator
should be
>> > > > > > immutability
>> > > > > > > and the user could override this manually if he/she
wants.
>> > > > > > >
>> > > > > > > What do you think?
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Gyula
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message