flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kenneth Knowles <k...@apache.org>
Subject Re: [DISCUSS] FLIP-17 Side Inputs
Date Fri, 10 Mar 2017 19:44:59 GMT
Hi all,

I thought I would briefly join this thread to mention some side input
lessons from Apache Beam. My knowledge of Flink is not deep enough,
technically or philosophically, to make any specific recommendations. And I
might just be repeating things that the docs and threads cover, but I hope
it might be helpful anyhow.

Side Input Visibility / matching: Beam started with a coupling between the
windowing on a stream and the way that windows are mapped between main
input and side input. This is actually not needed and we'll be making the
mapping explicit (with sensible defaults). In particular, the mapping
determines when you can garbage collect, when you know that no main input
element will ever map to a particular window again (so opaque mappings need
some metadata).

Side Input Readiness: There is an unpleasant asymmetry between waiting for
the first triggering of a side input but not waiting for any later
triggering. This manifests strongly when a user actually wants to know
something about the relationship to side input update latency and main
input processing. This echoes some of the concern here about user-defined
control over readiness. IMO this is a rather open area.

Default values for singleton side inputs: A special case of side input
readiness that is related also to windowing. By far the most useful
singleton side input is the result of a global reduction with an
associative&commutative operator. A lot of these operators also have an
identity element. It is nice for this identity element (known a priori) to
be "always available" on the side input, for every window, if it is
expected to be something that is continually updated. But if the
configuration is such that it is a one-time triggering of bounded data,
that behavior is not right. Related, after some amount of time we conclude
that no input will ever be received for a window, and the side input
becomes ready.

Map Side Inputs with triggers: When new data arrives for a key in Beam,
there's no way to know which value should "win", so you basically just
can't use map side inputs with triggers.

These are just some quick thoughts at a very high level.

Kenn

On Thu, Mar 9, 2017 at 7:59 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi Jamie,
> actually the approach where the .withSideInput() comes before the user
> function is only required for implementation proposal #1, which I like
> the least. For the other two it can be after the user function, which is
> also what I prefer.
>
> Regarding semantics: yes, we simply wait for anything to be available.
> For GlobalWindows, i.e. side inputs on a normal function where we simply
> don't have windows, this means that we wait for anything. For the
> windowed case, which I'm proposing as a second step we will wait for
> side input in a window to be available that matches the main-input
> window. For the keyed case we wait for something on the same key to be
> available, for the broadcast case we wait for anything.
>
> Best,
> Aljoscha
>
> On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
> > Hi, I think the proposal looks good.  The only thing I wasn't clear on
> > was
> > which API is actually being proposed.  The one where .withSideInput()
> > comes
> > before the user function or after.  I would definitely prefer it come
> > after
> > since that's the normal pattern in the Flink API.  I understood that
> > makes
> > the implementation different (maybe harder) but I think it helps keep the
> > API uniform which is really good.
> >
> > Overall I think the API looks good and yes there are some tricky
> > semantics
> > here but in general if, when processing keyed main streams, we always
> > wait
> > until there is a side-input available for that key we're off to a great
> > start and I think that was what you're suggesting in the design doc.
> >
> > -Jamie
> >
> >
> > On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <aljoscha@apache.org>
> > wrote:
> >
> > > Hi,
> > > these are all valuable suggestions and I think that we should implement
> > > them when the time is right. However, I would like to first get a
> > > minimal viable version of this feature into Flink and then expand on
> it.
> > > I think the last few tries of tackling this problem fizzled out because
> > > we got to deep into discussing special semantics and features. I think
> > > the most important thing to agree on right now is the basic API and the
> > > implementation plan. What do you think about that?
> > >
> > > Regarding your suggestions, I have in fact a branch [1] from May 2016
> > > where I implemented a prototype implementation. This has an n-ary
> > > operator and inputs can be either bounded or unbounded and the
> > > implementation actually waits for all bounded inputs to finish before
> > > starting to process the unbounded inputs.
> > >
> > > In general, I think blocking on an input is only possible while you're
> > > waiting for a bounded input to finish. If all inputs are unbounded you
> > > cannot block because you might run into deadlocks (in the processing
> > > graph, due to back pressure) and also because blocking will also block
> > > elements that might have a lower timestamp and might fall into a
> > > different window which is already ready for processing.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1]
> > > https://github.com/aljoscha/flink/commits/operator-ng-side-
> input-wrapper
> > >
> > > On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> > > > Hi Aljoscha, thank you for the proposal, it is great to hear about
> the
> > > > progress in side input.
> > > >
> > > > Following is my point of view:
> > > > 1. I think there may be an option to block the processing of the main
> > > > input
> > > > instead of buffer the data in state because in production, the
> through
> > > > put
> > > > of the main input is usually much larger, and buffering the data
> before
> > > > the
> > > > side input may slow down the preparing of side input since the i-o
> and
> > > > computing resources are always limited.
> > > > 2. another issue may need to be disscussed: how can we do
> checkpointing
> > > > with side input, because static side input may finish soon once
> started
> > > > which will stop the checkpointing.
> > > > 3. I agree with Gyula that user should be able to determines when a
> side
> > > > input is ready? Maybe we can do it one step further: whether users
> can
> > > > determine a operator with multiple inputs to process which input each
> > > > time
> > > > or not?  It would be more flexible.
> > > >
> > > >
> > > > Best Regards!
> > > > Wenlong
> > > >
> > > > On 7 March 2017 at 18:39, Ventura Del Monte <
> venturadelmonte@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > Thank you for the proposal and for bringing up again this
> discussion.
> > > > >
> > > > > Regarding the implementation aspect,I would say the first way could
> > > > > be easier/faster to implement but it could add some overhead when
> > > > > dealing with multiple side inputs through the current 2-streams
> union
> > > > > transform. I tried the second option myself as it has less overhead
> > > > > but then the outcome was something close to a N-ary operator
> consuming
> > > > > first each side input while buffering the main one.
> > > > > Therefore, I would choose the third option as it is more generic
> > > > > and might help also in other scenarios, although its implementation
> > > > > requires more effort.
> > > > > I also agree with Gyula, I think the user should be allowed to
> define
> > > the
> > > > > condition that determines when a side input is ready, e.g., load
> the
> > > side
> > > > > input first, incrementally update the side input.
> > > > >
> > > > > Best,
> > > > > Ventura
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > This message, for the D. Lgs n. 196/2003 (Privacy Code), may
> contain
> > > > > confidential and/or privileged information. If you are not the
> > > addressee or
> > > > > authorized to receive this for the addressee, you must not use,
> copy,
> > > > > disclose or take any action based on this message or any
> information
> > > > > herein. If you have received this message in error, please advise
> the
> > > > > sender immediately by reply e-mail and delete this message. Thank
> you
> > > for
> > > > > your cooperation.
> > > > >
> > > > > On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <gyula.fora@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > Thank you for the nice proposal!
> > > > > >
> > > > > > I think it would make sense to allow user's to affect the
> readiness
> > > of
> > > > > the
> > > > > > side input. I think making it ready when the first element
> arrives is
> > > > > only
> > > > > > slightly better then making it always ready from usability
> > > perspective.
> > > > > For
> > > > > > instance if I am joining against a static data set I want to
wait
> > > for the
> > > > > > whole set before making it ready. This could be exposed as a
user
> > > defined
> > > > > > condition that could also recognize bounded inputs maybe.
> > > > > >
> > > > > > Maybe we could also add an aggregating (merging) side input
type,
> > > that
> > > > > > could work as a broadcast state.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Gyula
> > > > > >
> > > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont:
2017.
> > > márc.
> > > > > 6.,
> > > > > > H, 15:18):
> > > > > >
> > > > > > > Hi Folks,
> > > > > > >
> > > > > > > I would like to finally agree on a plan for implementing
side
> > > inputs in
> > > > > > > Flink. There has already been an attempt to come to consensus
> [1],
> > > > > which
> > > > > > > resulted in two design documents. I tried to consolidate
those
> two
> > > and
> > > > > > > also added a section about implementation plans. This is
the
> > > resulting
> > > > > > > FLIP:
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > > 17+Side+Inputs+for+DataStream+API
> > > > > > >
> > > > > > >
> > > > > > > In terms of semantics I tried to go with the minimal viable
> > > solution.
> > > > > > > The part that needs discussing is how we want to implement
> this. I
> > > > > > > outlined three possible implementation plans in the FLIP
but
> what
> > > it
> > > > > > > boils down to is that we need to introduce some way of
getting
> > > several
> > > > > > > inputs into an operator/task.
> > > > > > >
> > > > > > >
> > > > > > > Please have a look at the doc and let us know what you
think.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Aljoscha
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > > https://lists.apache.org/thread.html/
> > > 797df0ba066151b77c7951fd7d603a
> > > > > > 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> >
> >
> > --
> >
> > Jamie Grier
> > data Artisans, Director of Applications Engineering
> > @jamiegrier <https://twitter.com/jamiegrier>
> > jamie@data-artisans.com
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message