flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine MARZOUGUI <y.marzou...@mindlytix.com>
Subject Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed
Date Mon, 05 Dec 2016 10:33:07 GMT
I forgot to mention : the watermark extractor is the one included in Flink
API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:

> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>()
> {
>     @Override
>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
>             return tuple3.f0;
>         }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetzger@apache.org>:
>
>> Hi Yassine,
>> are you sure your watermark extractor is the same between the two
>> versions. It sounds a bit like the watermarks for the 1.2 code are not
>> generated correctly.
>>
>> Regards,
>> Robert
>>
>>
>> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
>> y.marzougui@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>>> in memory and the windows results are not emitted until the whole stream is
>>> processed. Is this a temporary behaviour due to the developments in
>>> 1.2-SNAPSHOT, or a bug?
>>>
>>> I am using a code similar to the follwoing:
>>>
>>> env.setParallelism(1);
>>>
>>> DataStream<T> sessions = env
>>>     .readTextFile()
>>>     .flatMap()
>>>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>>>     .keyBy(1)
>>>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>>>     .apply().setParallelism(32)
>>>
>>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>

Mime
View raw message