flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: [DISCUSS] Re-add record copy to chained operator calls
Date Wed, 20 May 2015 13:41:20 GMT
I would go for the Failsafe option as a default behaviour with a clearly
documented lightweight (no-copy) setting, but I think having a Vote on this
would be the proper way of settling this question.

On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> I think that in the long run (maybe not too long) we will have to
> change our stateful operators (windows, basically) to use managed
> memory and spill to disk. (Think jobs that have sliding windows over
> days or weeks) Then then the internal operators will take care of
> copying anyways. The problem Gyula mentioned we cannot tackle other
> than by defining how user code must behave.
>
> On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <sewen@apache.org> wrote:
> > It does not mean we have to behave the same way, it is just an indication
> > that well-defined behavior can allow you to mess things up.
> >
> > The question is now what is the default mode:
> >  - Failsafe/Heavy (always copy)
> >  - Performance/Lightweight (do not copy)
> >
> >
> > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <sewen@apache.org> wrote:
> >
> >> This is something that we can clearly define as "should not be done".
> >> Systems do that.
> >> I think if you repeatedly emit (or mutate) the same object for example
> in
> >> Spark, you get an RDD with completely messed up contents.
> >>
> >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyfora@apache.org> wrote:
> >>
> >>> If the preceding operator is emitting a mutated object, or does
> something
> >>> with the output object afterwards then its a problem.
> >>>
> >>> Emitting the same object is a special case of this.
> >>>
> >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <sewen@apache.org>
> wrote:
> >>>
> >>> > The case you are making is if a preceding operator in a chain is
> >>> repeatedly
> >>> > emitting the same object, and the succeeding operator is gathering
> the
> >>> > objects, then it is a problem
> >>> >
> >>> > Or are there cases where the system itself repeatedly emits the same
> >>> > objects?
> >>> >
> >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyfora@apache.org>
> wrote:
> >>> >
> >>> > > We are designing a system for stateful stream computations,
> assuming
> >>> long
> >>> > > standing operators that gather and store data as the stream evolves
> >>> > (unlike
> >>> > > in the dataset api). Many programs, like windowing, sampling etc
> hold
> >>> the
> >>> > > state in the form of past data. And without careful understanding
> of
> >>> the
> >>> > > runtime these programs will break or have unnecessary copies.
> >>> > >
> >>> > > This is why I think immutability should be the default so we can
> have
> >>> a
> >>> > > clear dataflow model with immutable streams.
> >>> > >
> >>> > > I see absolutely no reason why we cant have the non-copy version
> as an
> >>> > > optional setting for the users.
> >>> > >
> >>> > >
> >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <parisc@kth.se>
> wrote:
> >>> > >
> >>> > > > @stephan I see your point. If we assume that operators do
not
> hold
> >>> > > > references in their state to any transmitted records it works
> fine.
> >>> We
> >>> > > > therefore need to make this clear to the users. I need to
check
> if
> >>> that
> >>> > > > would break semantics in SAMOA or other integrations as well
that
> >>> > assume
> >>> > > > immutability. For example in SAMOA there are often local
metric
> >>> objects
> >>> > > > that are being constantly mutated and simply forwarded
> periodically
> >>> to
> >>> > > > other (possibly chained) operators that need to evaluate
them.
> >>> > > >
> >>> > > > ________________________________________
> >>> > > > From: Gyula Fóra <gyfora@apache.org>
> >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> >>> > > > To: dev@flink.apache.org
> >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator
> calls
> >>> > > >
> >>> > > > "Copy before putting it into a window buffer and any other
group
> >>> > buffer."
> >>> > > >
> >>> > > > Exactly my point. Any stateful operator should be able to
> implement
> >>> > > > something like this without having to worry about copying
the
> object
> >>> > (and
> >>> > > > at this point the user would need to know whether it comes
from
> the
> >>> > > network
> >>> > > > to avoid unnecessary copies), so I don't agree with leaving
the
> copy
> >>> > off.
> >>> > > >
> >>> > > > The user can of course specify that the operator is mutable
if he
> >>> wants
> >>> > > > (and he is worried about the performance), But I still think
the
> >>> > default
> >>> > > > behaviour should be immutable.
> >>> > > > We cannot force users to not hold object references and also
it
> is a
> >>> > > quite
> >>> > > > unnatural way of programming in a language like java.
> >>> > > >
> >>> > > >
> >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <sewen@apache.org>
> >>> > wrote:
> >>> > > >
> >>> > > > > I am curious why the copying is actually needed.
> >>> > > > >
> >>> > > > > In the batch API, we chain and do not copy and it is
rather
> >>> > > predictable.
> >>> > > > >
> >>> > > > > The cornerpoints of that design is to follow these rules:
> >>> > > > >
> >>> > > > >  1) Objects read from the network or any buffer are
always new
> >>> > objects.
> >>> > > > > That comes naturally when they are deserialized as part
of that
> >>> (all
> >>> > > > > buffers store serialized)
> >>> > > > >
> >>> > > > >  2) After a function returned a record (or gives one
to the
> >>> > collector),
> >>> > > > it
> >>> > > > > if given to the chain of chained operators, but after
it is
> >>> through
> >>> > the
> >>> > > > > chain, no one else holds a reference to that object.
> >>> > > > >      For that, it is crucial that objects are not stored
by
> >>> > reference,
> >>> > > > but
> >>> > > > > either stored serialized, or a copy is stored.
> >>> > > > >
> >>> > > > > This is quite solid in the batch API. How about we follow
the
> same
> >>> > > > paradigm
> >>> > > > > in the streaming API. We would need to adjust the following:
> >>> > > > >
> >>> > > > > 1) Do not copy between operators (I think this is the
case
> right
> >>> now)
> >>> > > > >
> >>> > > > > 2) Copy before putting it into a window buffer and any
other
> group
> >>> > > > buffer.
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> >>> > aljoscha@apache.org
> >>> > > >
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Yes, in fact I anticipated this. There is one central
place
> >>> where
> >>> > we
> >>> > > > > > can insert a copy step, in OperatorCollector in
> OutputHandler.
> >>> > > > > >
> >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone
<
> parisc@kth.se>
> >>> > > wrote:
> >>> > > > > > > I guess it was not intended ^^.
> >>> > > > > > >
> >>> > > > > > > Chaining should be transparent and not break
the
> >>> correct/expected
> >>> > > > > > behaviour.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > Paris?
> >>> > > > > > >
> >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi
<
> mbalassi@apache.org
> >>> >
> >>> > > > wrote:
> >>> > > > > > >
> >>> > > > > > > +1 for copying.
> >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gyfora@apache.org>
> >>> > wrote:
> >>> > > > > > >
> >>> > > > > > > Hey,
> >>> > > > > > >
> >>> > > > > > > The latest streaming operator rework removed
the copying of
> >>> the
> >>> > > > outputs
> >>> > > > > > > before passing them to chained operators.
This is a major
> >>> break
> >>> > for
> >>> > > > the
> >>> > > > > > > previous operator semantics which guaranteed
immutability.
> >>> > > > > > >
> >>> > > > > > > I think this change leads to very indeterministic
program
> >>> > behaviour
> >>> > > > > from
> >>> > > > > > > the user's perspective as only non-chained
outputs/inputs
> >>> will be
> >>> > > > > > mutable.
> >>> > > > > > > If we allow this to happen, users will start
disabling
> >>> chaining
> >>> > to
> >>> > > > get
> >>> > > > > > > immutability which defeats the purpose. (chaining
should
> not
> >>> > affect
> >>> > > > > > program
> >>> > > > > > > behaviour just increase performance)
> >>> > > > > > >
> >>> > > > > > > In my opinion the default setting for each
operator should
> be
> >>> > > > > > immutability
> >>> > > > > > > and the user could override this manually
if he/she wants.
> >>> > > > > > >
> >>> > > > > > > What do you think?
> >>> > > > > > >
> >>> > > > > > > Regards,
> >>> > > > > > > Gyula
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message