flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: [DISCUSS] Change Streaming Operators to be Push-Only
Date Mon, 11 May 2015 16:29:25 GMT
Yep, I would say: Move ahead :-)

On Tue, May 5, 2015 at 4:48 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> So I gather I should go forward with this? If no-one objects I will
> open a Jira and work on this.
>
> On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> > Yes, because the handling of punctuations depends on the operator: A
> > MapOperator can just forward them while a windowed join or reduce can
> > only forward them after emitting the correct windows or results.
> >
> > On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <parisc@kth.se> wrote:
> >> By watermark handling I meant making punctuations explicit and
> forwarding/modifying them at the operator level. I think this is clear so
> far.
> >>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >>>
> >>> There is no watermark handling yet. :D
> >>>
> >>> But this would enable me to do this.
> >>>
> >>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <parisc@kth.se> wrote:
> >>>> I agree with Gyula on this one. Barriers should better not be exposed
> to the operator. They are system events for state management. Apart from
> that, watermark handling seems to be on a right track, I like it so far.
> >>>>
> >>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >>>>>
> >>>>> I don't know, I just put that there because other people are working
> >>>>> on the checkpointing/barrier thing. So there would need to be some
> >>>>> functionality there at some point.
> >>>>>
> >>>>> Or maybe it is not required there and can be handled in the
> >>>>> StreamTask. Others might know this better than I do right now.
> >>>>>
> >>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> >>>>>> What would the processBarrier method do?
> >>>>>>
> >>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> >>>>>>
> >>>>>>> I'm using the term punctuation and watermark interchangeably
here
> >>>>>>> because for practical purposes they do the same thing. I'm
not sure
> >>>>>>> what you meant with your comment about those.
> >>>>>>>
> >>>>>>> For the Operator interface I'm thinking about something
like this:
> >>>>>>>
> >>>>>>> abstract class OneInputStreamOperator<IN, OUT, F extends
> Function>  {
> >>>>>>>  public processElement(IN element);
> >>>>>>>  public processBarrier(...);
> >>>>>>>  public processPunctuation/lowWatermark(...):
> >>>>>>> }
> >>>>>>>
> >>>>>>> The operator also has access to the TaskContext and ExecutionConfig
> >>>>>>> and Serializers. The operator would emit values using an
emit()
> method
> >>>>>>> or the Collector interface, not sure about that yet.
> >>>>>>>
> >>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyfora@apache.org
> >>>>>>> <javascript:;>> wrote:
> >>>>>>>> I think this a good idea in general. I would try to
minimize the
> methods
> >>>>>>> we
> >>>>>>>> include and make the ones that we keep very concrete.
For
> instance i
> >>>>>>> would
> >>>>>>>> not have the receive barrier method as that is handled
on a
> totally
> >>>>>>>> different level already. And instead of punctuation
I would
> directly add
> >>>>>>> a
> >>>>>>>> method to work on watermarks.
> >>>>>>>>
> >>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljoscha@apache.org
> >>>>>>> <javascript:;>> wrote:
> >>>>>>>>
> >>>>>>>>> What do you mean by "losing iterations"?
> >>>>>>>>>
> >>>>>>>>> For the pros and cons:
> >>>>>>>>>
> >>>>>>>>> Cons: I can't think of any, since most of the operators
are
> chainable
> >>>>>>>>> already and already behave like a collector.
> >>>>>>>>>
> >>>>>>>>> Pros:
> >>>>>>>>> - Unified model for operators, chainable operators
don't have to
> >>>>>>>>> worry about input iterators and the collect interface.
> >>>>>>>>> - Enables features that we want in the future, such
as barriers
> and
> >>>>>>>>> punctuations because they don't work with the
> >>>>>>>>> simple Collector interface.
> >>>>>>>>> - The while-loop is moved outside of the operators,
now the Task
> (the
> >>>>>>>>> thing that runs Operators) can control the flow
of data better
> and
> >>>>>>>>> deal with
> >>>>>>>>> stuff like barriers and punctuations. If we want
to keep the
> >>>>>>>>> main-loop inside each operator, then they all have
to manage
> input
> >>>>>>>>> readers and inline events manually.
> >>>>>>>>>
> >>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas <
> ktzoumas@apache.org
> >>>>>>> <javascript:;>
> >>>>>>>>> <javascript:;>> wrote:
> >>>>>>>>>> Can you give us a rough idea of the pros and
cons? Do we lose
> some
> >>>>>>>>>> functionality by getting rid of iterations?
> >>>>>>>>>>
> >>>>>>>>>> Kostas
> >>>>>>>>>>
> >>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha Krettek
<
> aljoscha@apache.org
> >>>>>>> <javascript:;>
> >>>>>>>>> <javascript:;>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Folks,
> >>>>>>>>>>> while working on introducing source-assigned
timestamps into
> >>>>>>> streaming
> >>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967)
I thought
> about
> >>>>>>> how
> >>>>>>>>>>> the punctuations (low watermarks) can be
pushed through the
> system.
> >>>>>>>>>>> The problem is, that operators can have
two ways of getting
> input: 1.
> >>>>>>>>>>> They read directly from input iterators,
and 2. They act as a
> >>>>>>>>>>> Collector and get elements via collect()
from the previous
> operator
> >>>>>>> in
> >>>>>>>>>>> a chain.
> >>>>>>>>>>>
> >>>>>>>>>>> This makes it hard to push things through
a chain that are not
> >>>>>>>>>>> elements, such as barriers and/or punctuations.
> >>>>>>>>>>>
> >>>>>>>>>>> I propose to change all streaming operators
to be push based,
> with a
> >>>>>>>>>>> slightly improved interface: In addition
to collect(), which I
> would
> >>>>>>>>>>> call receiveElement() I would add receivePunctuation()
and
> >>>>>>>>>>> receiveBarrier(). The first operator in
the chain would also
> get data
> >>>>>>>>>>> from the outside invokable that reads from
the input iterator
> and
> >>>>>>>>>>> calls receiveElement() for the first operator
in a chain.
> >>>>>>>>>>>
> >>>>>>>>>>> What do you think? I would of course be
willing to implement
> this
> >>>>>>>>> myself.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Aljoscha
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message