flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Goetze <philipp.goe...@tu-ilmenau.de>
Subject Re: Custom Aggregate - Example
Date Fri, 21 Aug 2015 13:44:59 GMT
Thank you Aljoscha,

I guessed that I should use the reduce method. However, I do not look 
for window aggregations. I want to do this on a grouped stream.

The problem is we work with Lists instead of tuples and thus we can not 
use the pre-implemented aggregates.

So the idea is to call it like that:

    val aggr = source.groupBy(_(0)).reduce(new customReducer(1))

And this is the signature of the class:

    class customReducer(field: Int) extends RichReduceFunction[List[Any]]


How do I have to implement this class now, so that it is working 
correctly even with parallelism > 1?

I hope you understand what I try to do. =)

Kind Regards,
Philipp


On 21.08.2015 15:28, Aljoscha Krettek wrote:
> Hi,
> with the current API this should do what you are after:
>
> val input = ...
> val result = input
>   .window(...)
>   .groupBy(...)
>   .reduceWindow( /* your reduce function */ )
>
> With the reduce function you should be able to implement any custom 
> aggregations. You can also use foldWindow() if you want to do a 
> functional fold over the window.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 21 Aug 2015 at 14:51 Philipp Goetze 
> <philipp.goetze@tu-ilmenau.de <mailto:philipp.goetze@tu-ilmenau.de>> 
> wrote:
>
>     Hello community,
>
>     how do I define a custom aggregate function in Flink Streaming
>     (Scala)?
>     Could you please provide an example on how to do that?
>
>     Thank you and best regards,
>     Philipp
>


Mime
View raw message