flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [DISCUSS] Change Streaming Operators to be Push-Only
Date Mon, 11 May 2015 18:14:33 GMT
There is already a Jira and a Pull Request:
https://github.com/apache/flink/pull/659

On Mon, May 11, 2015 at 6:29 PM, Stephan Ewen <sewen@apache.org> wrote:
> Yep, I would say: Move ahead :-)
>
> On Tue, May 5, 2015 at 4:48 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> So I gather I should go forward with this? If no-one objects I will
>> open a Jira and work on this.
>>
>> On Tue, May 5, 2015 at 4:14 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>> > Yes, because the handling of punctuations depends on the operator: A
>> > MapOperator can just forward them while a windowed join or reduce can
>> > only forward them after emitting the correct windows or results.
>> >
>> > On Tue, May 5, 2015 at 3:58 PM, Paris Carbone <parisc@kth.se> wrote:
>> >> By watermark handling I meant making punctuations explicit and
>> forwarding/modifying them at the operator level. I think this is clear so
>> far.
>> >>> On 05 May 2015, at 15:41, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>> >>>
>> >>> There is no watermark handling yet. :D
>> >>>
>> >>> But this would enable me to do this.
>> >>>
>> >>> On Tue, May 5, 2015 at 3:39 PM, Paris Carbone <parisc@kth.se>
wrote:
>> >>>> I agree with Gyula on this one. Barriers should better not be exposed
>> to the operator. They are system events for state management. Apart from
>> that, watermark handling seems to be on a right track, I like it so far.
>> >>>>
>> >>>>> On 05 May 2015, at 15:26, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>> >>>>>
>> >>>>> I don't know, I just put that there because other people are
working
>> >>>>> on the checkpointing/barrier thing. So there would need to be
some
>> >>>>> functionality there at some point.
>> >>>>>
>> >>>>> Or maybe it is not required there and can be handled in the
>> >>>>> StreamTask. Others might know this better than I do right now.
>> >>>>>
>> >>>>> On Tue, May 5, 2015 at 3:24 PM, Gyula Fóra <gyula.fora@gmail.com>
>> wrote:
>> >>>>>> What would the processBarrier method do?
>> >>>>>>
>> >>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>> >>>>>>
>> >>>>>>> I'm using the term punctuation and watermark interchangeably
here
>> >>>>>>> because for practical purposes they do the same thing.
I'm not sure
>> >>>>>>> what you meant with your comment about those.
>> >>>>>>>
>> >>>>>>> For the Operator interface I'm thinking about something
like this:
>> >>>>>>>
>> >>>>>>> abstract class OneInputStreamOperator<IN, OUT, F
extends
>> Function>  {
>> >>>>>>>  public processElement(IN element);
>> >>>>>>>  public processBarrier(...);
>> >>>>>>>  public processPunctuation/lowWatermark(...):
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> The operator also has access to the TaskContext and
ExecutionConfig
>> >>>>>>> and Serializers. The operator would emit values using
an emit()
>> method
>> >>>>>>> or the Collector interface, not sure about that yet.
>> >>>>>>>
>> >>>>>>> On Tue, May 5, 2015 at 3:12 PM, Gyula Fóra <gyfora@apache.org
>> >>>>>>> <javascript:;>> wrote:
>> >>>>>>>> I think this a good idea in general. I would try
to minimize the
>> methods
>> >>>>>>> we
>> >>>>>>>> include and make the ones that we keep very concrete.
For
>> instance i
>> >>>>>>> would
>> >>>>>>>> not have the receive barrier method as that is handled
on a
>> totally
>> >>>>>>>> different level already. And instead of punctuation
I would
>> directly add
>> >>>>>>> a
>> >>>>>>>> method to work on watermarks.
>> >>>>>>>>
>> >>>>>>>> On Tuesday, May 5, 2015, Aljoscha Krettek <aljoscha@apache.org
>> >>>>>>> <javascript:;>> wrote:
>> >>>>>>>>
>> >>>>>>>>> What do you mean by "losing iterations"?
>> >>>>>>>>>
>> >>>>>>>>> For the pros and cons:
>> >>>>>>>>>
>> >>>>>>>>> Cons: I can't think of any, since most of the
operators are
>> chainable
>> >>>>>>>>> already and already behave like a collector.
>> >>>>>>>>>
>> >>>>>>>>> Pros:
>> >>>>>>>>> - Unified model for operators, chainable operators
don't have to
>> >>>>>>>>> worry about input iterators and the collect
interface.
>> >>>>>>>>> - Enables features that we want in the future,
such as barriers
>> and
>> >>>>>>>>> punctuations because they don't work with the
>> >>>>>>>>> simple Collector interface.
>> >>>>>>>>> - The while-loop is moved outside of the operators,
now the Task
>> (the
>> >>>>>>>>> thing that runs Operators) can control the flow
of data better
>> and
>> >>>>>>>>> deal with
>> >>>>>>>>> stuff like barriers and punctuations. If we
want to keep the
>> >>>>>>>>> main-loop inside each operator, then they all
have to manage
>> input
>> >>>>>>>>> readers and inline events manually.
>> >>>>>>>>>
>> >>>>>>>>> On Tue, May 5, 2015 at 2:41 PM, Kostas Tzoumas
<
>> ktzoumas@apache.org
>> >>>>>>> <javascript:;>
>> >>>>>>>>> <javascript:;>> wrote:
>> >>>>>>>>>> Can you give us a rough idea of the pros
and cons? Do we lose
>> some
>> >>>>>>>>>> functionality by getting rid of iterations?
>> >>>>>>>>>>
>> >>>>>>>>>> Kostas
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, May 5, 2015 at 1:37 PM, Aljoscha
Krettek <
>> aljoscha@apache.org
>> >>>>>>> <javascript:;>
>> >>>>>>>>> <javascript:;>>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Folks,
>> >>>>>>>>>>> while working on introducing source-assigned
timestamps into
>> >>>>>>> streaming
>> >>>>>>>>>>> (https://issues.apache.org/jira/browse/FLINK-1967)
I thought
>> about
>> >>>>>>> how
>> >>>>>>>>>>> the punctuations (low watermarks) can
be pushed through the
>> system.
>> >>>>>>>>>>> The problem is, that operators can have
two ways of getting
>> input: 1.
>> >>>>>>>>>>> They read directly from input iterators,
and 2. They act as a
>> >>>>>>>>>>> Collector and get elements via collect()
from the previous
>> operator
>> >>>>>>> in
>> >>>>>>>>>>> a chain.
>> >>>>>>>>>>>
>> >>>>>>>>>>> This makes it hard to push things through
a chain that are not
>> >>>>>>>>>>> elements, such as barriers and/or punctuations.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I propose to change all streaming operators
to be push based,
>> with a
>> >>>>>>>>>>> slightly improved interface: In addition
to collect(), which I
>> would
>> >>>>>>>>>>> call receiveElement() I would add receivePunctuation()
and
>> >>>>>>>>>>> receiveBarrier(). The first operator
in the chain would also
>> get data
>> >>>>>>>>>>> from the outside invokable that reads
from the input iterator
>> and
>> >>>>>>>>>>> calls receiveElement() for the first
operator in a chain.
>> >>>>>>>>>>>
>> >>>>>>>>>>> What do you think? I would of course
be willing to implement
>> this
>> >>>>>>>>> myself.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Cheers,
>> >>>>>>>>>>> Aljoscha
>> >>>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>
>> >>
>>

Mime
View raw message