flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Plamen Paskov <plamen.pas...@next-stream.com>
Subject Re: consecutive stream aggregations
Date Fri, 15 Dec 2017 15:56:04 GMT
In my case i have a lot of users with one session per user. What i'm 
thinking is to evenly distribute the users then accumulate and finally 
merge all accumulators. The problem is that i don't know how to achieve 
this.


On 15.12.2017 17:52, Ufuk Celebi wrote:
> You can first aggregate the length per user and emit it downstream.
> Then you do the all window and average all lengths. Does that make
> sense?
>
> On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov
> <plamen.paskov@next-stream.com> wrote:
>> I think i got your point.
>> What happens now: in order to use aggregate() i need an window but the
>> window requires keyBy() if i want to parallelize the data. In my case it
>> will not work because if i create keyBy("userId") then the average
>> will be calculated per userId  but i want average across all users. What
>> would be the solution in this case?
>>
>> Thanks
>>
>>
>> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>> Hey Plamen,
>>>
>>> I think what you are looking for is the AggregateFunction. This you
>>> can use on keyed streams. The Javadoc [1] contains an example for your
>>> use case (averaging).
>>>
>>> – Ufuk
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>
>>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>>> <plamen.paskov@next-stream.com> wrote:
>>>> Hi,
>>>>
>>>> I'm trying to calculate the running average of session length and i want
>>>> to
>>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>>> trying to do it like this:
>>>>
>>>> package flink;
>>>>
>>>> import lombok.AllArgsConstructor;
>>>> import lombok.NoArgsConstructor;
>>>> import lombok.ToString;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>> import
>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>> import
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import
>>>>
>>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>>> import
>>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>>> import org.apache.flink.util.Collector;
>>>>
>>>> import java.sql.Timestamp;
>>>> import java.time.Instant;
>>>> import java.time.LocalDateTime;
>>>> import java.util.TimeZone;
>>>>
>>>>
>>>> public class StreamingJob {
>>>>       public static void main(String[] args) throws Exception {
>>>>           StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>
>>>>           SingleOutputStreamOperator<Event> sessions = env
>>>>                   .socketTextStream("localhost", 9000, "\n")
>>>>                   .map(new MapFunction<String, Event>() {
>>>>                       @Override
>>>>                       public Event map(String value) throws Exception {
>>>>                           String[] row = value.split(",");
>>>>                           return new Event(Long.valueOf(row[0]), row[1],
>>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>>                       }
>>>>                   })
>>>>                   .assignTimestampsAndWatermarks(new
>>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>>                       @Override
>>>>                       public long extractTimestamp(Event element) {
>>>>                           return element.timestamp;
>>>>                       }
>>>>                   })
>>>>                   .keyBy("userId", "sessionId")
>>>>                   .maxBy("length");
>>>>
>>>>
>>>>           sessions
>>>>                   .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>>                   .apply(new AllWindowFunction<Event, Avg, TimeWindow>()
{
>>>>                       @Override
>>>>                       public void apply(TimeWindow window, Iterable<Event>
>>>> values, Collector<Avg> out) throws Exception {
>>>>                           long sum = 0;
>>>>                           int count = 0;
>>>>
>>>>                           for (Event event : values) {
>>>>                               sum += event.length;
>>>>                               count++;
>>>>                           }
>>>>
>>>>                           double avg = sum / count;
>>>>                           LocalDateTime windowStart =
>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>>> TimeZone.getDefault().toZoneId());
>>>>                           LocalDateTime windowEnd =
>>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>>> TimeZone.getDefault().toZoneId());
>>>>                           out.collect(new Avg(avg, windowStart.toString(),
>>>> windowEnd.toString()));
>>>>                       }
>>>>                   });
>>>>
>>>>           env.execute();
>>>>       }
>>>>
>>>>       @AllArgsConstructor
>>>>       @NoArgsConstructor
>>>>       @ToString
>>>>       public static class Avg {
>>>>           public double length;
>>>>           public String windowStart;
>>>>           public String windowEnd;
>>>>       }
>>>>
>>>>       @AllArgsConstructor
>>>>       @NoArgsConstructor
>>>>       @ToString
>>>>       public static class Event {
>>>>           public long userId;
>>>>           public String sessionId;
>>>>           public long length;
>>>>           public long timestamp;
>>>>       }
>>>> }
>>>>
>>>> First i want to extract the last session event for every user-session
>>>> because it contains the total session length. Then i want to calculate
>>>> the
>>>> average session length based on the data from
>>>> previous operation (based on the sessions variable).
>>>>
>>>> Example:
>>>>
>>>> 1,s1,100,2017-12-13 11:58:01
>>>> 1,s1,150,2017-12-13 11:58:02
>>>> 1,s1,160,2017-12-13 11:58:03
>>>> 2,s1,100,2017-12-13 11:58:04
>>>>
>>>> sessions variable should contain those rows:
>>>> 1,s1,160,2017-12-13 11:58:03
>>>> 2,s1,100,2017-12-13 11:58:04
>>>>
>>>> but it's returning the max length row only for the corresponding event.
>>>>
>>>> Questions:
>>>> - how to collect the data for all groups in sessions variable?
>>>> - is there another way to achieve this functionality because with my
>>>> implementation the average will be computed on single node because
>>>> sessions
>>>> is of type SingleOutputStreamOperator<Event>
>>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>>>
>>>> Thanks
>>


Mime
View raw message