flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: [DISCUSS] Behaviour of Streaming Sources
Date Fri, 08 May 2015 08:59:39 GMT
I think the problem with this void next() approach is exactly the way it
works:

"Using this interface, "next()" can loop internally as long
as tuples are available and return if there is (currently) no input."

We dont want the user to loop internally in the next because then we have
almost the same problem as now with the run(). We want to do snapshots
between 2 produced source elements, roughly the same time at all the
sources so we cannot afford waiting for some random user behaviour to
finish.


On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Did you consider the Storm way to handle this?
>
> Storm offers a method "void next()" that uses a collector object to emit
> new tuples. Using this interface, "next()" can loop internally as long
> as tuples are available and return if there is (currently) no input.
> What I have seen, people tend to emit a single tuple an leave next()
> immediately, because Storm call next() in an infinite loop anyway.
> -> You could force the UDF to return each time, be disallowing
> consecutive calls to Collector.out(...).
>
> If next() is called by the system and it returns, it is easy to check if
> the out(..) method of the collector object was called at least once. If
> the recored was emitted, Storm "sleeps" for a while before calling
> next() again, to avoid busy waiting. The sleeping time is increased for
> consecutive "empty" next() calls and reset the first time next() emits
> records again.
>
> I like this interface, because it's very simple and would prefer it over
> an interface with many methods.
>
>
> -Matthias
>
>
> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> > Hi,
> > in the process of reworking the Streaming Operator model I'm also
> > reworking the sources in order to get rid of the loop in each source.
> > Right now, the interface for sources (SourceFunction) has one method:
> > run(). This is called when the source starts and can just output
> > elements at any time using the Collector interface. This does not give
> > the Task that runs the source a lot of control in suspending operation
> > for performing checkpoints or some such thing.
> >
> > I thought about changing the interface to this:
> >
> > interface SourceFunction<T>  {
> >   boolean reachedEnd();
> >   T next();
> > }
> >
> > This is similar to the batch API and also to what Stephan proposes in
> > his pull request. I think this will not work for streaming because
> > sources might not have new elements to emit at the moment but might
> > have something to emit in the future. This is problematic because
> > streaming topologies are usually running indefinitely. In that case,
> > the reachedEnd() and next() would have to be blocking (until a new
> > element arrives). This again does not give the task the power to
> > suspend operation at will.
> >
> > I propose a three function interface:
> >
> > interface SourceFunction<T> {
> >   boolean reachedEnd():
> >   boolean hasNext():
> >   T next();
> > }
> >
> > where the contract for the source is as follows:
> >  - reachedEnd() == true => stop the source
> >  - hasNext() == true => call next() to retrieve next element
> >  - hasNext() == false => call again at some later point
> >  - next() => retrieve next element, throw exception if no element
> available
> >
> > I thought about allowing next() to return NULL to signal that no
> > element is available at the moment. This will not work because a
> > source might want to return NULL as an element.
> >
> > What do you think? Any other ideas about solving this?
> >
> > Cheers,
> > Aljoscha
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message