flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Hermann <m...@gaborhermann.com>
Subject Re: [DISCUSS] FLIP-17 Side Inputs
Date Fri, 10 Mar 2017 16:25:09 GMT
Hi all,

Thanks Aljoscha for going forward with the side inputs and for the nice 

I'm also in favor of the implementation with N-ary input (3.) for the 
reasons Ventura explained. I'm strongly against managing side inputs at 
StreamTask level (2.), as it would create another abstraction for almost 
the same purposes as a TwoInputOperator. Making use of the second input 
of a 2-input operator (1.) could be useful for prototyping. I assume it 
would be easier to implement a minimal solution with that, but I'm not 
sure. If the N-ary input prototype is almost ready, then it's best to go 
with that.

For side input readiness, it would be better to wait for the side input 
to be completely ready. As Gyula has suggested, waiting only for the 
first record does not differ much from not waiting at all. I would also 
prefer user-defined readiness, but for the minimal solution we could fix 
this for completely read side input and maybe go only for static side 
inputs first.

I understand that we should push a minimal viable solution forward. The 
current API and implementation proposal seems like a good start. 
However, long term goals are also important, to avoid going in a wrong 
direction. As I have not participated in the discussion let me share 
also some longer term considerations in reply to the others. (Sorry for 
the length.)

How would side inputs help the users? For the simple, non-windowed cases 
with static input a CoFlatMap might be sufficient. The main input can be 
buffered while the side input is consumed and stored in the operator 
state. Thus, the user can decide inside the CoFlatMap UDF when to start 
consuming the stream input (e.g. when the side input is ready). Of 
course, this might be problematic to implement, so the side inputs API 
could help the user with this pattern.

First, marking the end of side input is not easy. Every side input 
should broadcast some kind of EOF to the consuming operator. If we 
generalize to non-static (slowly changing) inputs, then progress 
tracking messages should be broadcast periodically. This is reminiscent 
of the watermark time tracking for windows.

I agree with Gyula that we should have user defined side input 
readiness. Although, couldn't we use windowing for this? It's not worth 
having two separate time tracking mechanisms (one for windows, one for 
side inputs). If the windowing is not flexible enough to handle such 
cases, then what about exposing watermark tracking to the user? E.g. we 
could have an extra user defined event handler in RichFunctions when 
time progress is made. This generalizes the two progress tracking. Of 
course, this approach requires more work so it's not for the minimal 
viable solution.

Second, exposing a buffer to the user helps a bit, but the users could 
buffer the data simply in an operator state. How would a buffer help 
more? Of course, the interface could have multiple implementations, such 
as a spilling buffer, and the user could choose. That helps the "waiting 

I agree with Wenlong's suggestion that a blocking (or backpressure) must 
be an option. It seems crucial to avoid consuming a large part of the 
main input, that would take a lot of space. I suggest not to expose a 
buffer, but to allow the users to control whether to read from the 
different inputs. E.g. in the N-ary input operator UDF the user could 
control this per input: startConsuming(), stopConsuming(). Then it's the 
user's responsibility not to get into deadlocks, but the runtime handles 
the buffering. For reading static side input, the user could stop 
consuming the main input until she considers the side input ready.

User controlled backpressure would also benefit avoiding deadlock in 
stream loops.

I also agree with Wenlong's 2. point, that checkpointing should be 
considered, but I don't think it's really important for the prototype. 
If we maintain the side input in the state of the consuming operator 
then the checkpoint would not stop once the static side input is 
finished, because the main input goes on, the operator stays running. 
Incremental checkpointing could prevent checkpointing static data at 
every checkpoint.


On 2017-03-09 16:59, Aljoscha Krettek wrote:

> Hi Jamie,
> actually the approach where the .withSideInput() comes before the user
> function is only required for implementation proposal #1, which I like
> the least. For the other two it can be after the user function, which is
> also what I prefer.
> Regarding semantics: yes, we simply wait for anything to be available.
> For GlobalWindows, i.e. side inputs on a normal function where we simply
> don't have windows, this means that we wait for anything. For the
> windowed case, which I'm proposing as a second step we will wait for
> side input in a window to be available that matches the main-input
> window. For the keyed case we wait for something on the same key to be
> available, for the broadcast case we wait for anything.
> Best,
> Aljoscha
> On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
>> Hi, I think the proposal looks good.  The only thing I wasn't clear on
>> was
>> which API is actually being proposed.  The one where .withSideInput()
>> comes
>> before the user function or after.  I would definitely prefer it come
>> after
>> since that's the normal pattern in the Flink API.  I understood that
>> makes
>> the implementation different (maybe harder) but I think it helps keep the
>> API uniform which is really good.
>> Overall I think the API looks good and yes there are some tricky
>> semantics
>> here but in general if, when processing keyed main streams, we always
>> wait
>> until there is a side-input available for that key we're off to a great
>> start and I think that was what you're suggesting in the design doc.
>> -Jamie
>> On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>> Hi,
>>> these are all valuable suggestions and I think that we should implement
>>> them when the time is right. However, I would like to first get a
>>> minimal viable version of this feature into Flink and then expand on it.
>>> I think the last few tries of tackling this problem fizzled out because
>>> we got to deep into discussing special semantics and features. I think
>>> the most important thing to agree on right now is the basic API and the
>>> implementation plan. What do you think about that?
>>> Regarding your suggestions, I have in fact a branch [1] from May 2016
>>> where I implemented a prototype implementation. This has an n-ary
>>> operator and inputs can be either bounded or unbounded and the
>>> implementation actually waits for all bounded inputs to finish before
>>> starting to process the unbounded inputs.
>>> In general, I think blocking on an input is only possible while you're
>>> waiting for a bounded input to finish. If all inputs are unbounded you
>>> cannot block because you might run into deadlocks (in the processing
>>> graph, due to back pressure) and also because blocking will also block
>>> elements that might have a lower timestamp and might fall into a
>>> different window which is already ready for processing.
>>> Best,
>>> Aljoscha
>>> [1]
>>> https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper
>>> On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
>>>> Hi Aljoscha, thank you for the proposal, it is great to hear about the
>>>> progress in side input.
>>>> Following is my point of view:
>>>> 1. I think there may be an option to block the processing of the main
>>>> input
>>>> instead of buffer the data in state because in production, the through
>>>> put
>>>> of the main input is usually much larger, and buffering the data before
>>>> the
>>>> side input may slow down the preparing of side input since the i-o and
>>>> computing resources are always limited.
>>>> 2. another issue may need to be disscussed: how can we do checkpointing
>>>> with side input, because static side input may finish soon once started
>>>> which will stop the checkpointing.
>>>> 3. I agree with Gyula that user should be able to determines when a side
>>>> input is ready? Maybe we can do it one step further: whether users can
>>>> determine a operator with multiple inputs to process which input each
>>>> time
>>>> or not?  It would be more flexible.
>>>> Best Regards!
>>>> Wenlong
>>>> On 7 March 2017 at 18:39, Ventura Del Monte <venturadelmonte@gmail.com>
>>>> wrote:
>>>>> Hi Aljoscha,
>>>>> Thank you for the proposal and for bringing up again this discussion.
>>>>> Regarding the implementation aspect,I would say the first way could
>>>>> be easier/faster to implement but it could add some overhead when
>>>>> dealing with multiple side inputs through the current 2-streams union
>>>>> transform. I tried the second option myself as it has less overhead
>>>>> but then the outcome was something close to a N-ary operator consuming
>>>>> first each side input while buffering the main one.
>>>>> Therefore, I would choose the third option as it is more generic
>>>>> and might help also in other scenarios, although its implementation
>>>>> requires more effort.
>>>>> I also agree with Gyula, I think the user should be allowed to define
>>> the
>>>>> condition that determines when a side input is ready, e.g., load the
>>> side
>>>>> input first, incrementally update the side input.
>>>>> Best,
>>>>> Ventura
>>>>> This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
>>>>> confidential and/or privileged information. If you are not the
>>> addressee or
>>>>> authorized to receive this for the addressee, you must not use, copy,
>>>>> disclose or take any action based on this message or any information
>>>>> herein. If you have received this message in error, please advise the
>>>>> sender immediately by reply e-mail and delete this message. Thank you
>>> for
>>>>> your cooperation.
>>>>> On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <gyula.fora@gmail.com>
>>> wrote:
>>>>>> Hi Aljoscha,
>>>>>> Thank you for the nice proposal!
>>>>>> I think it would make sense to allow user's to affect the readiness
>>> of
>>>>> the
>>>>>> side input. I think making it ready when the first element arrives
>>>>> only
>>>>>> slightly better then making it always ready from usability
>>> perspective.
>>>>> For
>>>>>> instance if I am joining against a static data set I want to wait
>>> for the
>>>>>> whole set before making it ready. This could be exposed as a user
>>> defined
>>>>>> condition that could also recognize bounded inputs maybe.
>>>>>> Maybe we could also add an aggregating (merging) side input type,
>>> that
>>>>>> could work as a broadcast state.
>>>>>> What do you think?
>>>>>> Gyula
>>>>>> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont:
>>> márc.
>>>>> 6.,
>>>>>> H, 15:18):
>>>>>>> Hi Folks,
>>>>>>> I would like to finally agree on a plan for implementing side
>>> inputs in
>>>>>>> Flink. There has already been an attempt to come to consensus
>>>>> which
>>>>>>> resulted in two design documents. I tried to consolidate those
>>> and
>>>>>>> also added a section about implementation plans. This is the
>>> resulting
>>>>>>> FLIP:
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>> 17+Side+Inputs+for+DataStream+API
>>>>>>> In terms of semantics I tried to go with the minimal viable
>>> solution.
>>>>>>> The part that needs discussing is how we want to implement this.
>>>>>>> outlined three possible implementation plans in the FLIP but
>>> it
>>>>>>> boils down to is that we need to introduce some way of getting
>>> several
>>>>>>> inputs into an operator/task.
>>>>>>> Please have a look at the doc and let us know what you think.
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>> [1]
>>>>>>> https://lists.apache.org/thread.html/
>>> 797df0ba066151b77c7951fd7d603a
>>>>>> 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E
>> -- 
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com

View raw message