flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Last event of each window belongs to the next window - Wrong
Date Mon, 07 Nov 2016 14:42:41 GMT
Hi Samir,

the windowing API in Flink works the following way: First an incoming
element is assigned to a window. This is defined in the window clause where
you create a GlobalWindow. Thus, all elements for the same sourceId will be
assigned to the same window. Next, the element is given to a Trigger which
decides whether the window shall be evaluated or not. But at this point the
element is already part of the window. That's why the last element of your
window has a different ID.

What you could try to use is the MergingWindowAssigner to create windows
whose elements all have the same ID. There you assign all elements with the
same ID to the same session window. The session windows are then triggered
by event time, for example. That's the recommended way to create session
windows with Flink. Here is some documentation for session windows [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#session-windows

Cheers,
Till

On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <abdou.samir.mailing@gmail.com>
wrote:

> I am using Flink 1.2-Snapshot. My data looks like the following:
>
>    - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
>    - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
>    - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
>    - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
>    - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
>    - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
>    - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
>    - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
>    - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
>
> I am running the following code to create windows based user IDs:
>
>     stream.flatMap(new LogsParser())
>             .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
>             .keyBy("sourceId")
>             .window(GlobalWindows.create())
>             .trigger(PurgingTrigger.of(new MySessionTrigger()))
>             .apply(new SessionWindowFunction())
>             .print();
>
> MySession trigger looks into the received event and check the user ID to
> trigger the window on user ID changes. The SessionWindowFunction just
> create a session out of the window.
>
> Here are the sessions created:
>
>    1.
>
>    Session:
>    - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
>       - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14,
>       value=920
>       - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14,
>       value=944
>       - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149,
>       value=944
>    2.
>
>    Session:
>    - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
>       - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71,
>       value=955
>       - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71,
>       value=955
>       - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14,
>       value=960
>    3.
>
>    Session:
>    - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
>
> The problem as you can see is that in every session the last event belongs
> actually to the next window. The decision to trigger the window is somehow
> late as the last event is already in the window.
>
> How can I trigger the window without considering the last event in that
> window?
>
> Thanks for your help.
>

Mime
View raw message