flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Rework of streaming iteration API
Date Tue, 07 Jul 2015 14:16:31 GMT
@Kostas:
This new API is I believe equivalent in expressivity with the current one.
We can define nested loops now as well.
And I also don't see nested loops much worse generally than simple loops.

Gyula Fóra <gyula.fora@gmail.com> ezt írta (időpont: 2015. júl. 7., K,
16:14):

> Sorry Stephan I meant it slightly differently, I see your point:
>
> DataStream source = ...
> SingleInputOperator mapper = source.map(...)
> mapper.addInput()
>
> So the add input would be a method of the operator not the stream.
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl. 7.,
> K, 16:12):
>
>> I think this would be good yes. I was just about to open an Issue for
>> changing the Streaming Iteration API. :D
>>
>> Then we should also make the implementation very straightforward and
>> simple, right now, the implementation of the iterations is all over the
>> place.
>>
>> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gyfora@apache.org> wrote:
>>
>> > Hey,
>> >
>> > Along with the suggested changes to the streaming API structure I think
>> we
>> > should also rework the "iteration" api. Currently the iteration api
>> tries
>> > to mimic the syntax of the batch API while the runtime behaviour is
>> quite
>> > different.
>> >
>> > What we create instead of iterations is really just cyclic streams
>> (loops
>> > in the streaming job), so the API should somehow be intuitive about this
>> > behaviour.
>> >
>> > I suggest to remove the explicit iterate call and instead add a method
>> to
>> > the StreamOperators that allows to connect feedback inputs (create
>> loops).
>> > It would look like this:
>> >
>> > A mapper that does nothing but iterates over some filtered input:
>> >
>> > *Current API :*
>> > DataStream source = ..
>> > IterativeDataStream it = source.iterate()
>> > DataStream mapper = it.map(noOpMapper)
>> > DataStream feedback = mapper.filter(...)
>> > it.closeWith(feedback)
>> >
>> > *Suggested API :*
>> > DataStream source = ..
>> > DataStream mapper = source.map(noOpMapper)
>> > DataStream feedback = mapper.filter(...)
>> > mapper.addInput(feedback)
>> >
>> > The suggested approach would let us define inputs to operators after
>> they
>> > are created and implicitly union them with the normal input. This is I
>> > think a much clearer approach than what we have now.
>> >
>> > What do you think?
>> >
>> > Gyula
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message