flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Streaming left outer join
Date Wed, 27 Jan 2016 20:53:05 GMT
Hi!

I think this pull request may be implementing what you are looking for:
https://github.com/apache/flink/pull/1527

Stephan


On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryzlov@gmail.com>
wrote:

> Hello Aljoscha,
>
> Indeed, it seems like I'd need a custom operator. I imagine this involves
> implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could
> you provide those pointers please?
>
> Alex
>
> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>> I’m afraid there is currently now way to do what you want with the
>> builtin window primitives. Each of the slices of the sliding windows is
>> essentially evaluated independently. Therefore, there cannot be effects in
>> one slice that influence processing of another slice.
>>
>> What you could do is switch to tumbling windows, then each element would
>> only be in one window. That probably won’t fit your use case anymore. The
>> alternative I see to that is to implement everything in a custom operator
>> where you deal with window states and triggering on time yourself. Let me
>> know if you need some pointers about that one.
>>
>> Cheers,
>> Aljoscha
>> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryzlov@gmail.com>
>> wrote:
>> >
>> > Hello,
>> >
>> > I'm trying to implement a left outer join of two Kafka streams within a
>> sliding window. So far I have the following code:
>> >
>> > foos
>> >   .coGroup(bars)
>> >   .where(_.baz).equalTo(_.baz)
>> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES),
>> Time.of(1, TimeUnit.SECONDS)))
>> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>> >    fs.foreach(f =>
>> >     if (bs.isEmpty)
>> >       o.collect(FooBar(f, None))
>> >     else
>> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>> >    )
>> >   )
>> >
>> > However, this results in the pair being emitted from every window
>> slide, regardless of the match. The desired behaviour would be:
>> > * emit the the match as soon as it's found, don't emit any more pairs
>> for it,
>> > * otherwise, emit the empty match, when the left side element leaves
>> the last of its windows
>> >
>> > What would be the idiomatic/efficient way to implement such behaviour?
>> Is it possible at all with the coGroup/window mechanism, or some other way
>> is necessary?
>> >
>> > Alex
>>
>>
>

Mime
View raw message