flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Streaming left outer join
Date Wed, 27 Jan 2016 10:03:55 GMT
I’m afraid there is currently now way to do what you want with the builtin window primitives.
Each of the slices of the sliding windows is essentially evaluated independently. Therefore,
there cannot be effects in one slice that influence processing of another slice.

What you could do is switch to tumbling windows, then each element would only be in one window.
That probably won’t fit your use case anymore. The alternative I see to that is to implement
everything in a custom operator where you deal with window states and triggering on time yourself.
Let me know if you need some pointers about that one.

> On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryzlov@gmail.com> wrote:
> Hello, 
> I'm trying to implement a left outer join of two Kafka streams within a sliding window.
So far I have the following code:
> foos
>   .coGroup(bars)
>   .where(_.baz).equalTo(_.baz)
>   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS)))
>   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>    fs.foreach(f =>
>     if (bs.isEmpty)
>       o.collect(FooBar(f, None))
>     else
>       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>    )
>   )
> However, this results in the pair being emitted from every window slide, regardless of
the match. The desired behaviour would be:
> * emit the the match as soon as it's found, don't emit any more pairs for it,
> * otherwise, emit the empty match, when the left side element leaves the last of its
> What would be the idiomatic/efficient way to implement such behaviour? Is it possible
at all with the coGroup/window mechanism, or some other way is necessary?
> Alex

View raw message