flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: [DISCUSS] Behaviour of Streaming Sources
Date Mon, 11 May 2015 08:37:54 GMT
I would not go into this direction. Returning lists is messy I think. I
would stick with hasNext and Next returning a single element

On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> We could also change next() to return List<T> and say that the method
> must not sit and wait but simply return stuff that is available
> without waiting while also being able to not return anything for the
> moment.
>
> On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax
> <mjsax@informatik.hu-berlin.de> wrote:
> > You are right. That is why I pointed out this already:
> >
> >> -> You could force the UDF to return each time, be disallowing
> >>>> consecutive calls to Collector.out(...).
> >
> > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too.
> >
> >
> > -Matthias
> >
> >
> >
> > On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> >> I think the problem with this void next() approach is exactly the way it
> >> works:
> >>
> >> "Using this interface, "next()" can loop internally as long
> >> as tuples are available and return if there is (currently) no input."
> >>
> >> We dont want the user to loop internally in the next because then we
> have
> >> almost the same problem as now with the run(). We want to do snapshots
> >> between 2 produced source elements, roughly the same time at all the
> >> sources so we cannot afford waiting for some random user behaviour to
> >> finish.
> >>
> >>
> >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> >> mjsax@informatik.hu-berlin.de> wrote:
> >>
> >>> Did you consider the Storm way to handle this?
> >>>
> >>> Storm offers a method "void next()" that uses a collector object to
> emit
> >>> new tuples. Using this interface, "next()" can loop internally as long
> >>> as tuples are available and return if there is (currently) no input.
> >>> What I have seen, people tend to emit a single tuple an leave next()
> >>> immediately, because Storm call next() in an infinite loop anyway.
> >>> -> You could force the UDF to return each time, be disallowing
> >>> consecutive calls to Collector.out(...).
> >>>
> >>> If next() is called by the system and it returns, it is easy to check
> if
> >>> the out(..) method of the collector object was called at least once. If
> >>> the recored was emitted, Storm "sleeps" for a while before calling
> >>> next() again, to avoid busy waiting. The sleeping time is increased for
> >>> consecutive "empty" next() calls and reset the first time next() emits
> >>> records again.
> >>>
> >>> I like this interface, because it's very simple and would prefer it
> over
> >>> an interface with many methods.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> >>>> Hi,
> >>>> in the process of reworking the Streaming Operator model I'm also
> >>>> reworking the sources in order to get rid of the loop in each source.
> >>>> Right now, the interface for sources (SourceFunction) has one method:
> >>>> run(). This is called when the source starts and can just output
> >>>> elements at any time using the Collector interface. This does not give
> >>>> the Task that runs the source a lot of control in suspending operation
> >>>> for performing checkpoints or some such thing.
> >>>>
> >>>> I thought about changing the interface to this:
> >>>>
> >>>> interface SourceFunction<T>  {
> >>>>   boolean reachedEnd();
> >>>>   T next();
> >>>> }
> >>>>
> >>>> This is similar to the batch API and also to what Stephan proposes in
> >>>> his pull request. I think this will not work for streaming because
> >>>> sources might not have new elements to emit at the moment but might
> >>>> have something to emit in the future. This is problematic because
> >>>> streaming topologies are usually running indefinitely. In that case,
> >>>> the reachedEnd() and next() would have to be blocking (until a new
> >>>> element arrives). This again does not give the task the power to
> >>>> suspend operation at will.
> >>>>
> >>>> I propose a three function interface:
> >>>>
> >>>> interface SourceFunction<T> {
> >>>>   boolean reachedEnd():
> >>>>   boolean hasNext():
> >>>>   T next();
> >>>> }
> >>>>
> >>>> where the contract for the source is as follows:
> >>>>  - reachedEnd() == true => stop the source
> >>>>  - hasNext() == true => call next() to retrieve next element
> >>>>  - hasNext() == false => call again at some later point
> >>>>  - next() => retrieve next element, throw exception if no element
> >>> available
> >>>>
> >>>> I thought about allowing next() to return NULL to signal that no
> >>>> element is available at the moment. This will not work because a
> >>>> source might want to return NULL as an element.
> >>>>
> >>>> What do you think? Any other ideas about solving this?
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>>
> >>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message