flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Fwd: Incremental aggregation using Fold and failure recovery
Date Thu, 29 Jun 2017 12:39:57 GMT
Sorry, one typo.
public AverageAccumulator merge(WindowStats a, WindowStats b) {
should be:
public WindowStats merge(WindowStats a, WindowStats b) {
On 29 June 2017 at 8:22:34 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org) wrote:

I see. Then yes, a fold operation would be more efficient here.

However, can you give an idea on how to use aggregateFunction in latest flink to replace the
following fold function?
Sure! The documentation for 1.3 is still a bit lagging behind for some of the new features,
but the Javadoc for `AggregateFunction` should be rather self-explaining.

As a quick sketch, here’s what you would do to achieve the same thing:

public class WindowStatsAggregator implements AggregateFunction<IN, WindowStats, OUT>
    public WindowStats createAccumulator() {
        return new WindowStats();

    public AverageAccumulator merge(WindowStats a, WindowStats b) {
        // merge the two unique products map in your WindowStats

    public void add(IN value, WindowStats acc) {
        // update your unique products map

    public OUT getResult(WindowStats acc) {
        return acc.getMap();

As you can see, the `AggregateFunction` is more generic, and should subsume whatever you were
previously doing with fold.
Your previous `WindowStats` class is basically the state accumulator, and you need to implement
how to update it, merge two accumulators, and retrieve the final accumulated result.

For more info, I would point to the class Javadocs of `AggregateFunction`.

On 29 June 2017 at 8:06:25 PM, Ahmad Hassan (ahmad.hassan@gmail.com) wrote:

Hi Gordon,

Thanks for the details. I am using fold to process events and maintain statistics per each
product ID within WindowStats instance. So fold is much efficient because events can be in
millions but unique products will be less than 50k. However, if i use generic window function,
It will be less efficient because window function will receive a collection of millions of
events and they will be replicated for each sliding window as Flink replicate events for each
sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink to replace the
following fold function?

final DataStream<WindowStats> eventStream = inputStream
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());


On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats instance) will be checkpointed
by Flink as managed state, and restored from the last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the fold is
just collecting the elements of the windows per key and performing the actual aggregation
in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction---the-generic-case

On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.hassan@gmail.com) wrote:

Any thoughts on this problem please?

Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 'N' can be
50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric ProductMetric>>
which keeps products event count and other various metrics. So for 50k products I will have
50k entries in the map within WindowStats instance instead of millions of Events as fold
function will process them as the event arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats instance for
each existing window will automatically be checkpointed and restored on recovery? If not then
how can I better a implement above usecase to store the state of WindowStats object within
fold operation please?

Thanks for all the help.

Best Regards,

View raw message