flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@data-artisans.com>
Subject Re: periodic trigger
Date Fri, 22 Dec 2017 10:10:05 GMT
Ok, I think now I understand your problem. 

Wouldn’t it be enough, if you change last global window to something like this:

lastUserSession
        .timeWindowAll(Time.seconds(10))
        .aggregate(new AverageSessionLengthAcrossAllUsers())
        .print();

(As a side note, maybe you should use ContinousEventTimeTrigger in the first window). This
way it will aggregate and calculate average session length of only last “preview results”
of the 60 seconds user windows (emitted every 10 seconds from the first aggregation).

Piotrek

> On 21 Dec 2017, at 15:18, Plamen Paskov <plamen.paskov@next-stream.com> wrote:
> 
> Imagine a case where i want to run a computation every X seconds for 1 day window. I
want the calculate average session length for current day every X seconds. Is there an easy
way to achieve that?
> 
> On 21.12.2017 16:06, Piotr Nowojski wrote:
>> Hi,
>> 
>> You defined a tumbling window (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>)
of 60 seconds, triggered every 10 seconds. This means that each input element can be processed/averaged
up to 6 times (there is no other way if you trigger each window multiple times).
>> 
>> I am not sure what are you trying to achieve, but please refer to the documentation
about different window types (tumbling, sliding, session) maybe it will clarify things for
you:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>
>> 
>> If you want to avoid duplicated processing, use either tumbling window with default
trigger (triggering at the end of the window), or use session windows.
>> 
>> Piotrek
>> 
>> 
>>> On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.paskov@next-stream.com <mailto:plamen.paskov@next-stream.com>>
wrote:
>>> 
>>> Hi guys,
>>> I have the following code:
>>> 
>>> SingleOutputStreamOperator<Event> lastUserSession = env
>>>         .socketTextStream("localhost", 9000, "\n")
>>>         .map(new MapFunction<String, Event>() {
>>>             @Override
>>>             public Event map(String value) throws Exception {
>>>                 String[] row = value.split(",");
>>>                 return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]),
Timestamp.valueOf(row[3]).getTime());
>>>             }
>>>         })
>>>         .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10))
{
>>>             @Override
>>>             public long extractTimestamp(Event element) {
>>>                 return element.timestamp;
>>>             }
>>>         })
>>>         .keyBy("userId", "sessionId")
>>>         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>>         .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>>         .maxBy("length", false);
>>> 
>>> lastUserSession
>>>         .timeWindowAll(Time.seconds(60))
>>>         .aggregate(new AverageSessionLengthAcrossAllUsers())
>>>         .print();
>>> 
>>> What i'm trying to achieve is to calculate the average session length every 10
seconds. The problem is that once the window length is 60 seconds and a computation is triggered
>>> every 10 seconds i will receive duplicate events in my average calculation method
so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before

>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
>>> Any other suggestions how to workaround this?
>>> 
>>> Thanks
>> 
> 


Mime
View raw message