flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmad Hassan <ahmad.has...@gmail.com>
Subject Re: Fwd: Incremental aggregation using Fold and failure recovery
Date Thu, 29 Jun 2017 14:47:26 GMT
Thanks a lot Gordon!

On 29 June 2017 at 13:39, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:

> Sorry, one typo.
>
> public AverageAccumulator merge(WindowStats a, WindowStats b) {
>
> should be:
>
> public WindowStats merge(WindowStats a, WindowStats b) {
>
> On 29 June 2017 at 8:22:34 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org)
> wrote:
>
> I see. Then yes, a fold operation would be more efficient here.
>
>
> However, can you give an idea on how to use aggregateFunction in latest
> flink to replace the following fold function?
>
> Sure! The documentation for 1.3 is still a bit lagging behind for some of
> the new features, but the Javadoc for `AggregateFunction` should be rather
> self-explaining.
>
> As a quick sketch, here’s what you would do to achieve the same thing:
>
> public class WindowStatsAggregator implements AggregateFunction<IN, WindowStats, OUT>
{
>
>     public WindowStats createAccumulator() {
>         return new WindowStats();
>     }
>
>     public AverageAccumulator merge(WindowStats a, WindowStats b) {
>         // merge the two unique products map in your WindowStats
>     }
>
>     public void add(IN value, WindowStats acc) {
>         // update your unique products map
>     }
>
>     public OUT getResult(WindowStats acc) {
>         return acc.getMap();
>     }
> }
>
>
> As you can see, the `AggregateFunction` is more generic, and should
> subsume whatever you were previously doing with fold.
> Your previous `WindowStats` class is basically the state accumulator, and
> you need to implement how to update it, merge two accumulators, and
> retrieve the final accumulated result.
>
> For more info, I would point to the class Javadocs of `AggregateFunction`.
>
> Best,
> Gordon
>
> On 29 June 2017 at 8:06:25 PM, Ahmad Hassan (ahmad.hassan@gmail.com)
> wrote:
>
> Hi Gordon,
>
> Thanks for the details. I am using fold to process events and maintain
> statistics per each product ID within WindowStats instance. So fold is much
> efficient because events can be in millions but unique products will be
> less than 50k. However, if i use generic window function, It will be less
> efficient because window function will receive a collection of millions of
> events and they will be replicated for each sliding window as Flink
> replicate events for each sliding window.
>
> However, can you give an idea on how to use aggregateFunction in latest
> flink to replace the following fold function?
>
> final DataStream<WindowStats> eventStream = inputStream
> .keyBy(TENANT, CATEGORY)
> .window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
> *.fold(new WindowStats(),* newProductAggregationMapper(),
> newProductAggregationWindowFunction());
>
> Thanks!
>
> On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
>
>> Hi Ahmad,
>>
>> Yes, that is correct. The aggregated fold value (i.e. your WindowStats
>> instance) will be checkpointed by Flink as managed state, and restored from
>> the last complete checkpoint in case of failures.
>> One comment on using the fold function: if what you’re essentially doing
>> in the fold is just collecting the elements of the windows per key and
>> performing the actual aggregation in the window function, then you don't
>> need the fold.
>> A generic window function should suit that case. See [1].
>>
>> Cheers,
>> Gordon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/dev/windows.html#windowfunction---the-generic-case
>>
>>
>> On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.hassan@gmail.com)
>> wrote:
>>
>> Any thoughts on this problem please?
>>
>>
>> Hi All,
>>
>> I am collecting millions of events per 24hour for 'N' number of products
>> where 'N' can be 50k. I use the following fold mechanism with sliding
>> window:
>>
>> final DataStream<WindowStats> eventStream = inputStream
>> .keyBy(TENANT, CATEGORY)
>> .window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
>> *.fold(new WindowStats(),* newProductAggregationMapper(),
>> newProductAggregationWindowFunction());
>>
>> In WindowStats class, I keep a map of HashMap<String productID,
>> ProductMetric ProductMetric>> which keeps products event count and other
>> various metrics. So for 50k products I will have 50k entries in the map
>> within WindowStats instance instead of millions of Events as fold
>> function will process them as the event arrives.
>>
>> My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats
>> instance for each existing window will automatically be checkpointed and
>> restored on recovery? If not then how can I better a implement above
>> usecase to store the state of WindowStats object within fold operation
>> please?
>>
>> Thanks for all the help.
>>
>> Best Regards,
>>
>>
>

Mime
View raw message