flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Hermann <m...@gaborhermann.com>
Subject Re: [DISCUSS] FLIP-17 Side Inputs
Date Fri, 17 Mar 2017 20:28:25 GMT
Thanks for demonstrating the windowed side-input case. I completely 
agree that handling windowed side-input separately would just simply 
complicate the implementation. The triggering mechanism for the upstream 
window could define when the windowed input is ready.

I would gladly contribute to a low-level requirement. If there's a 
somewhat well defined JIRA issue, I'm happy to start working on it.


On 2017-03-17 16:03, Aljoscha Krettek wrote:
> Yes, I agree! The implementation stuff we talked about so far is only
> visible at the operator level. A user function that uses the (future)
> side API would not be aware of whether buffering or blocking is used. It
> would simply know that it is invoked and that side input is ready.
> I'll also quickly try to elaborate on my comment about why I think
> windowing/triggering in the side-input operator itself is not necessary.
> I created a figure: http://imgur.com/a/aAlw7 It is enough for the
> side-input operator simply to consider side input for a given window as
> ready when we have seen some data for that window. The WindowOperator
> that is upstream of the side input will take care of
> windowing/triggering.
> I'll create Jira issues for implementing the low-level requirements for
> side inputs (n-ary operator, broadcast state and buffering) and update
> this thread. If anyone is interested on working on one of those we might
> have a chance of getting this ready for Flink 1.3. Time is a bit tight
> for me because I'm going to be on vacation for 1.5 weeks starting next
> week Wednesday and after that we have Flink Forward.
> Best,
> Aljoscha
> On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:
>> Regarding the CoFlatMap workaround,
>> - For keyed streams, do you suggest that having a per-key buffer stored
>> as keyed state would have a large memory overhead? That must be true,
>> although a workaround could be partitioning the data and using a
>> non-keyed stream. Of course that seems hacky, as we have a keyed stream
>> abstraction, so I agree with you.
>> - I agree that keeping a broadcast side-input in the operator state is
>> not optimal. That's a good point I have not thought about. First we have
>> a separate abstraction for broadcast state, then we can optimize e.g.
>> checkpointing it (avoiding checkpointing it at every operator).
>> Regarding blocking/backpressuring inputs, it should not only be useful
>> for static side-input, but also for periodically updated (i.e. slowly
>> changing). E.g. when a machine learning model is updated and loaded
>> every hour, it make sense to prioritize loading the model on the side
>> input. But I see the limitations of the underlying runtime.
>> Exposing a buffer could be useful for now. Although, the *API* for
>> blocking could even be implemented by simply buffering. So the buffering
>> could be hidden from the user, and later maybe optimized to not only
>> buffer, but also apply backpressure. What do you think? Again, for the
>> prototype, exposing the buffer should be fine IMHO. API and
>> implementation for blocking inputs could be a separate issue, but let's
>> not forget about it.
>> Cheers,
>> Gabor
>> On 2017-03-15 16:14, Aljoscha Krettek wrote:
>>> Hi,
>>> thanks for you input! :-)
>>> Regarding 1)
>>> I don't see the benefit of integrating windowing into the side-input
>>> logic. Windowing can happen upstream and whenever that emits new data
>>> then operator will notice because there is new input. Having windowing
>>> inside the side-input of an operator as well would just make the
>>> implementation more complex without adding benefit, IMHO.
>>> Regarding 2)
>>> That's a very good observation! I think we are fine, though, because
>>> checkpoint barriers never "overtake" elements. It's only elements that
>>> can overtake checkpoint barriers. If the broadcast state on different
>>> parallel instances differs in a checkpoint then it only differs because
>>> some parallel instances have reflected changes in their state from
>>> elements that they shouldn't have "seen" yet in the exactly-once mode.
>>> If we pick the state of an arbitrary instance as the de-facto state we
>>> don't break guarantees any more than turning on at-least-once mode does.
>>> Regarding 3)
>>> We need the special buffer support for keyed operations because there we
>>> need to make sure that data is restored on the correct operator that is
>>> responsible for the key of the data while also allowing us to iterate
>>> over all the buffered data (for when we are ready to process the data).
>>> This iteration over elements is not possible when simply storing data in
>>> keyed state.
>>> What do you think?
>>> On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:
>>>> Hi, Aljoscha, I just go through your prototype. I like the design of the
>>>> SideInputReader which can make it flexible to determine when we can get
>>>> the
>>>> side input.
>>>> I agree that side inputs are API sugar on the top of the three
>>>> components(n-ary
>>>> inputs, broadcast state and input buffering), following is some more
>>>> thought about the three component:
>>>> 1. Take both N-ary input operator and windowing/triggers mechanism into
>>>> consideration, I think we may need the N-ary input operator supports some
>>>> inputs(side inputs) are windowed while the others(main input) are normal
>>>> stream. for static/slow-evolving data, we need to use global windows and
>>>> for windowed-base join data , we need to use time window or custom
>>>> windows.
>>>> The window function on the side input can be used to collect or merge the
>>>> data to generate the value of the side input(a single value or
>>>> list/map).
>>>> Once a side input reader window is triggered, the SideInputReader will
>>>> return value available, and if a Window is triggered more than once, the
>>>> value of side input will be updated and maybe the SideInputReader need a
>>>> interface to notice the user that something changed. Besides, I prefer
>>>> the
>>>> option to make every input of N-ary input operator equal, because user
>>>> may
>>>> need one side input depends on another side input.
>>>> 2. Regarding broadcast state, my concern is that how can we merge the
>>>> value
>>>> of the state from different subtasks. If the job running in at least once
>>>> mode, the returned value of broadcast state from different subtasks will
>>>> be
>>>> different. Is there already any design on broadcast state?
>>>> 3. Regarding input buffering, I think if we use window/trigger mechanism,
>>>> state can be store in the state of window, which may be mostly like what
>>>> we
>>>> need to do currently in KeyedWindow and AllWindow. We may need to allow
>>>> custom merge strategy on all window state data since in side inputs we
>>>> may
>>>> need to choose data according to broadcast state strategy  while in
>>>> normal
>>>> windows we can just redistribute the window state data.
>>>> What do you think?
>>>> Best Regards!
>>>> Wenlong
>>>> On 14 March 2017 at 01:41, Aljoscha Krettek <aljoscha@apache.org> wrote:
>>>>> Ha! this is turning out to be quite the discussion. :-) Also, thanks
>>>>> Kenn, for chiming in with the Beam perspective!
>>>>> I'll try and address some stuff.
>>>>> It seems we have some consensus on using N-ary operator to implement
>>>>> side inputs. I see two ways forward there:
>>>>>    - Have a "pure" N-ary operator that has zero inputs by default and
>>>>>    N inputs are equal: this exists side-by-side with the current one-input
>>>>>    operator and two-input operator.
>>>>>    - Extends the existing operators with more inputs: the main input(s)
>>>>>    would be considered different from the N other inputs, internally.
>>>>>    this, we would not have to rewrite existing operators and could simply
>>>>>    have side inputs as an add-on.
>>>>> There weren't any (many?) comments on using broadcast state for side
>>>>> inputs. I think there is not much to agree on there because it seems
>>>>> pretty straightforward to me that we need this.
>>>>> About buffering: I think we need this as a Flink service because it is
>>>>> right now not (easily) possible to buffer keyed input. For keyed input
>>>>> we need to checkpoint the input buffers with the key-grouped state.
>>>>> Otherwise the data would not be distributed to the correct operator when
>>>>> restoring. This is explained in the FLIP in more detail.
>>>>> If we have these three components (n-ary inputs, broadcast state and
>>>>> input buffering) then side inputs are mostly API sugar on top. I even
>>>>> believe that it might be enough to simply provide these and then users
>>>>> have a very flexible system that allows them to implement different
>>>>> side-input variants. I'm suggesting this because I see there are a lot
>>>>> of different opinions and because the "field" of determining a side
>>>>> input to be finished is still quite open.
>>>>> Now, regarding Gabor's comments which, I think, pretty nicely summed
>>>>> the ongoing discussion and added some new stuff:
>>>>> About the CoFlatMap for the simple case: I think this is almost
>>>>> possible, except for the buffering in case of a keyed input stream.
>>>>> Also, the side input is not easy to store because we need broadcast
>>>>> state for that (depending, of course, on whether the input(s) are keyed
>>>>> or not). I think with the above-mentioned additions this case would be
>>>>> possible without explicit support for side inputs in the API.
>>>>> Re 1)
>>>>> I would prefer to use windowing/triggers for determining side-input
>>>>> readiness. There are, right now, enough messages flying around the
>>>>> system and introducing yet more doesn't seem to desirable for me right
>>>>> now. We should, of course, revisit this once we have the basic
>>>>> components in place.
>>>>> Re 2)
>>>>> See my comments about buffering in a keyed operator above. Regarding
>>>>> blocking, this is currently not possible because all inputs are consumed
>>>>> by one thread. This could, of course, change in the future but it is
>>>>> feature (limitation?) of the current implementation. In general, I think
>>>>> blocking an input is only ever feasible while waiting for some bounded
>>>>> inputs to be fully consumed. I.e. when you have some initial loading
>>>>> data from a static data set.
>>>>> Re 3)
>>>>> Agreed, I think that we should keep the side-input in the (yet to be
>>>>> introduced) broadcast state. Again, once we have the basics in place
>>>>> can investigate further optimisations here such as not checkpointing
>>>>> side-input data from a static data set because we know that we can
>>>>> easily rebuild it.
>>>>> What do you think?
>>>>> On Fri, Mar 10, 2017, at 20:44, Kenneth Knowles wrote:
>>>>>> Hi all,
>>>>>> I thought I would briefly join this thread to mention some side input
>>>>>> lessons from Apache Beam. My knowledge of Flink is not deep enough,
>>>>>> technically or philosophically, to make any specific recommendations.
>>>>>> I
>>>>>> might just be repeating things that the docs and threads cover, but
>>>>>> hope
>>>>>> it might be helpful anyhow.
>>>>>> Side Input Visibility / matching: Beam started with a coupling between
>>>>>> the
>>>>>> windowing on a stream and the way that windows are mapped between
>>>>>> input and side input. This is actually not needed and we'll be making
>>>>>> mapping explicit (with sensible defaults). In particular, the mapping
>>>>>> determines when you can garbage collect, when you know that no main
>>>>>> element will ever map to a particular window again (so opaque mappings
>>>>>> need
>>>>>> some metadata).
>>>>>> Side Input Readiness: There is an unpleasant asymmetry between waiting
>>>>>> for
>>>>>> the first triggering of a side input but not waiting for any later
>>>>>> triggering. This manifests strongly when a user actually wants to
>>>>>> something about the relationship to side input update latency and
>>>>>> input processing. This echoes some of the concern here about user-defined
>>>>>> control over readiness. IMO this is a rather open area.
>>>>>> Default values for singleton side inputs: A special case of side
>>>>>> readiness that is related also to windowing. By far the most useful
>>>>>> singleton side input is the result of a global reduction with an
>>>>>> associative&commutative operator. A lot of these operators also
have an
>>>>>> identity element. It is nice for this identity element (known a priori)
>>>>>> to
>>>>>> be "always available" on the side input, for every window, if it
>>>>>> expected to be something that is continually updated. But if the
>>>>>> configuration is such that it is a one-time triggering of bounded
>>>>>> that behavior is not right. Related, after some amount of time we
>>>>>> conclude
>>>>>> that no input will ever be received for a window, and the side input
>>>>>> becomes ready.
>>>>>> Map Side Inputs with triggers: When new data arrives for a key in
>>>>>> there's no way to know which value should "win", so you basically
>>>>>> can't use map side inputs with triggers.
>>>>>> These are just some quick thoughts at a very high level.
>>>>>> Kenn
>>>>>> On Thu, Mar 9, 2017 at 7:59 AM, Aljoscha Krettek <aljoscha@apache.org>
>>>>>> wrote:
>>>>>>> Hi Jamie,
>>>>>>> actually the approach where the .withSideInput() comes before
the user
>>>>>>> function is only required for implementation proposal #1, which
I like
>>>>>>> the least. For the other two it can be after the user function,
>>>>> is
>>>>>>> also what I prefer.
>>>>>>> Regarding semantics: yes, we simply wait for anything to be available.
>>>>>>> For GlobalWindows, i.e. side inputs on a normal function where
>>>>> simply
>>>>>>> don't have windows, this means that we wait for anything. For
>>>>>>> windowed case, which I'm proposing as a second step we will wait
>>>>>>> side input in a window to be available that matches the main-input
>>>>>>> window. For the keyed case we wait for something on the same
key to be
>>>>>>> available, for the broadcast case we wait for anything.
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>> On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
>>>>>>>> Hi, I think the proposal looks good.  The only thing I wasn't
>>>>> on
>>>>>>>> was
>>>>>>>> which API is actually being proposed.  The one where .withSideInput()
>>>>>>>> comes
>>>>>>>> before the user function or after.  I would definitely prefer
it come
>>>>>>>> after
>>>>>>>> since that's the normal pattern in the Flink API.  I understood
>>>>>>>> makes
>>>>>>>> the implementation different (maybe harder) but I think it
>>>>> keep the
>>>>>>>> API uniform which is really good.
>>>>>>>> Overall I think the API looks good and yes there are some
>>>>>>>> semantics
>>>>>>>> here but in general if, when processing keyed main streams,
we always
>>>>>>>> wait
>>>>>>>> until there is a side-input available for that key we're
off to a
>>>>> great
>>>>>>>> start and I think that was what you're suggesting in the
design doc.
>>>>>>>> -Jamie
>>>>>>>> On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <
>>>>> aljoscha@apache.org>
>>>>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>> these are all valuable suggestions and I think that we
>>>>> implement
>>>>>>>>> them when the time is right. However, I would like to
first get a
>>>>>>>>> minimal viable version of this feature into Flink and
then expand
>>>>> on
>>>>>>> it.
>>>>>>>>> I think the last few tries of tackling this problem fizzled
>>>>> because
>>>>>>>>> we got to deep into discussing special semantics and
features. I
>>>>> think
>>>>>>>>> the most important thing to agree on right now is the
basic API
>>>>> and the
>>>>>>>>> implementation plan. What do you think about that?
>>>>>>>>> Regarding your suggestions, I have in fact a branch [1]
from May
>>>>> 2016
>>>>>>>>> where I implemented a prototype implementation. This
has an n-ary
>>>>>>>>> operator and inputs can be either bounded or unbounded
and the
>>>>>>>>> implementation actually waits for all bounded inputs
to finish
>>>>> before
>>>>>>>>> starting to process the unbounded inputs.
>>>>>>>>> In general, I think blocking on an input is only possible
>>>>> you're
>>>>>>>>> waiting for a bounded input to finish. If all inputs
are unbounded
>>>>> you
>>>>>>>>> cannot block because you might run into deadlocks (in
>>>>> processing
>>>>>>>>> graph, due to back pressure) and also because blocking
will also
>>>>> block
>>>>>>>>> elements that might have a lower timestamp and might
fall into a
>>>>>>>>> different window which is already ready for processing.
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>> [1]
>>>>>>>>> https://github.com/aljoscha/flink/commits/operator-ng-side-
>>>>>>> input-wrapper
>>>>>>>>> On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
>>>>>>>>>> Hi Aljoscha, thank you for the proposal, it is great
to hear
>>>>> about
>>>>>>> the
>>>>>>>>>> progress in side input.
>>>>>>>>>> Following is my point of view:
>>>>>>>>>> 1. I think there may be an option to block the processing
of the
>>>>> main
>>>>>>>>>> input
>>>>>>>>>> instead of buffer the data in state because in production,
>>>>>>> through
>>>>>>>>>> put
>>>>>>>>>> of the main input is usually much larger, and buffering
the data
>>>>>>> before
>>>>>>>>>> the
>>>>>>>>>> side input may slow down the preparing of side input
since the
>>>>> i-o
>>>>>>> and
>>>>>>>>>> computing resources are always limited.
>>>>>>>>>> 2. another issue may need to be disscussed: how can
we do
>>>>>>> checkpointing
>>>>>>>>>> with side input, because static side input may finish
soon once
>>>>>>> started
>>>>>>>>>> which will stop the checkpointing.
>>>>>>>>>> 3. I agree with Gyula that user should be able to
>>>>> when a
>>>>>>> side
>>>>>>>>>> input is ready? Maybe we can do it one step further:
>>>>> users
>>>>>>> can
>>>>>>>>>> determine a operator with multiple inputs to process
which input
>>>>> each
>>>>>>>>>> time
>>>>>>>>>> or not?  It would be more flexible.
>>>>>>>>>> Best Regards!
>>>>>>>>>> Wenlong
>>>>>>>>>> On 7 March 2017 at 18:39, Ventura Del Monte <
>>>>>>> venturadelmonte@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>> Thank you for the proposal and for bringing up
again this
>>>>>>> discussion.
>>>>>>>>>>> Regarding the implementation aspect,I would say
the first way
>>>>> could
>>>>>>>>>>> be easier/faster to implement but it could add
some overhead
>>>>> when
>>>>>>>>>>> dealing with multiple side inputs through the
current 2-streams
>>>>>>> union
>>>>>>>>>>> transform. I tried the second option myself as
it has less
>>>>> overhead
>>>>>>>>>>> but then the outcome was something close to a
N-ary operator
>>>>>>> consuming
>>>>>>>>>>> first each side input while buffering the main
>>>>>>>>>>> Therefore, I would choose the third option as
it is more
>>>>> generic
>>>>>>>>>>> and might help also in other scenarios, although
>>>>> implementation
>>>>>>>>>>> requires more effort.
>>>>>>>>>>> I also agree with Gyula, I think the user should
be allowed to
>>>>>>> define
>>>>>>>>> the
>>>>>>>>>>> condition that determines when a side input is
ready, e.g.,
>>>>> load
>>>>>>> the
>>>>>>>>> side
>>>>>>>>>>> input first, incrementally update the side input.
>>>>>>>>>>> Best,
>>>>>>>>>>> Ventura
>>>>>>>>>>> This message, for the D. Lgs n. 196/2003 (Privacy
Code), may
>>>>>>> contain
>>>>>>>>>>> confidential and/or privileged information. If
you are not the
>>>>>>>>> addressee or
>>>>>>>>>>> authorized to receive this for the addressee,
you must not use,
>>>>>>> copy,
>>>>>>>>>>> disclose or take any action based on this message
or any
>>>>>>> information
>>>>>>>>>>> herein. If you have received this message in
error, please
>>>>> advise
>>>>>>> the
>>>>>>>>>>> sender immediately by reply e-mail and delete
this message.
>>>>> Thank
>>>>>>> you
>>>>>>>>> for
>>>>>>>>>>> your cooperation.
>>>>>>>>>>> On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <
>>>>> gyula.fora@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>> Thank you for the nice proposal!
>>>>>>>>>>>> I think it would make sense to allow user's
to affect the
>>>>>>> readiness
>>>>>>>>> of
>>>>>>>>>>> the
>>>>>>>>>>>> side input. I think making it ready when
the first element
>>>>>>> arrives is
>>>>>>>>>>> only
>>>>>>>>>>>> slightly better then making it always ready
from usability
>>>>>>>>> perspective.
>>>>>>>>>>> For
>>>>>>>>>>>> instance if I am joining against a static
data set I want to
>>>>> wait
>>>>>>>>> for the
>>>>>>>>>>>> whole set before making it ready. This could
be exposed as a
>>>>> user
>>>>>>>>> defined
>>>>>>>>>>>> condition that could also recognize bounded
inputs maybe.
>>>>>>>>>>>> Maybe we could also add an aggregating (merging)
side input
>>>>> type,
>>>>>>>>> that
>>>>>>>>>>>> could work as a broadcast state.
>>>>>>>>>>>> What do you think?
>>>>>>>>>>>> Gyula
>>>>>>>>>>>> Aljoscha Krettek <aljoscha@apache.org>
ezt írta (időpont:
>>>>> 2017.
>>>>>>>>> márc.
>>>>>>>>>>> 6.,
>>>>>>>>>>>> H, 15:18):
>>>>>>>>>>>>> Hi Folks,
>>>>>>>>>>>>> I would like to finally agree on a plan
for implementing
>>>>> side
>>>>>>>>> inputs in
>>>>>>>>>>>>> Flink. There has already been an attempt
to come to
>>>>> consensus
>>>>>>> [1],
>>>>>>>>>>> which
>>>>>>>>>>>>> resulted in two design documents. I tried
to consolidate
>>>>> those
>>>>>>> two
>>>>>>>>> and
>>>>>>>>>>>>> also added a section about implementation
plans. This is
>>>>> the
>>>>>>>>> resulting
>>>>>>>>>>>>> FLIP:
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>>>>>>>> 17+Side+Inputs+for+DataStream+API
>>>>>>>>>>>>> In terms of semantics I tried to go with
the minimal viable
>>>>>>>>> solution.
>>>>>>>>>>>>> The part that needs discussing is how
we want to implement
>>>>>>> this. I
>>>>>>>>>>>>> outlined three possible implementation
plans in the FLIP
>>>>> but
>>>>>>> what
>>>>>>>>> it
>>>>>>>>>>>>> boils down to is that we need to introduce
some way of
>>>>> getting
>>>>>>>>> several
>>>>>>>>>>>>> inputs into an operator/task.
>>>>>>>>>>>>> Please have a look at the doc and let
us know what you
>>>>> think.
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://lists.apache.org/thread.html/
>>>>>>>>> 797df0ba066151b77c7951fd7d603a
>>>>>>>>>>>> 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.
>>>>> org%3E
>>>>>>>> --
>>>>>>>> Jamie Grier
>>>>>>>> data Artisans, Director of Applications Engineering
>>>>>>>> @jamiegrier <https://twitter.com/jamiegrier>
>>>>>>>> jamie@data-artisans.com

View raw message