flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Regarding implementation of aggregate function using a ProcessFunction
Date Fri, 28 Sep 2018 10:52:25 GMT
Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
> Hi Gaurav,
>
> Yes, you are right. It is really not allowed to use RichFunction. I 
> will Ping Timo, he may give you a more professional answer.
>
> Thanks, vino.
>
> Gaurav Luthra <gauravluthra6134@gmail.com 
> <mailto:gauravluthra6134@gmail.com>> 于2018年9月28日周五 下午4:27写道:
>
>     Hi Vino,
>
>     Kindly check below flink code.
>
>     package org.apache.flink.streaming.api.datastream.WindowedStream
>
>     @PublicEvolving
>     public <ACC, R> SingleOutputStreamOperator<R>
>     aggregate(AggregateFunction<T, ACC, R> function) {
>     checkNotNull(function, "function");
>
>     if (*function instanceof RichFunction*) {
>     throw new *UnsupportedOperationException("This aggregation
>     function cannot be a RichFunction.")*;
>     }
>
>     TypeInformation<ACC> accumulatorType =
>     TypeExtractor.getAggregateFunctionAccumulatorType(
>     function, input.getType(), null, false);
>
>     TypeInformation<R> resultType =
>     TypeExtractor.getAggregateFunctionReturnType(
>     function, input.getType(), null, false);
>
>     return aggregate(function, accumulatorType, resultType);
>     }
>
>
>     Kindly, check above snapshot of flink;s aggregate() method, that
>     got applied on windowed stream.
>
>     Thanks & Regards
>     Gaurav Luthra
>     Mob:- +91-9901945206
>
>
>     On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1127@gmail.com
>     <mailto:yanghua1127@gmail.com>> wrote:
>
>         Hi Gaurav,
>
>         This is very strange, can you share your code and specific
>         exceptions? Under normal circumstances, it should not throw an
>         exception.
>
>         Thanks, vino.
>
>         Gaurav Luthra <gauravluthra6134@gmail.com
>         <mailto:gauravluthra6134@gmail.com>> 于2018年9月28日周五
>         下午3:27写道:
>
>             Hi Vino,
>
>             RichAggregateFunction can surely access the state. But the
>             problem is, In aggregate() method we can not use
>             RichAggregateFunction.
>             If we use then it throws exception.
>
>             So, the option is to use AggregateFunction (not Rich) with
>             aggregate() method on windowed stream. Now, In
>             AggregateFunction, we cannot access RuntimeContext. Hence
>             we can not use state.
>
>             Thanks & Regards
>             Gaurav
>
>
>
>             On Fri, 28 Sep, 2018, 12:40 PM vino yang,
>             <yanghua1127@gmail.com <mailto:yanghua1127@gmail.com>> wrote:
>
>                 Hi Gaurav,
>
>                 Why do you think the RichAggregateFunction cannot
>                 access the State API?
>                 RichAggregateFunction inherits from
>                 AbstractRichFunction (it provides a RuntimeContext
>                 that allows you to access the state API).
>
>                 Thanks, vino.
>
>                 Gaurav Luthra <gauravluthra6134@gmail.com
>                 <mailto:gauravluthra6134@gmail.com>> 于2018年9月28日周五
>                 下午1:38写道:
>
>                     Hi,
>
>                     As we are aware, Currently we cannot use
>                     RichAggregateFunction in
>                     aggregate() method upon windowed stream. So, To
>                     access the state in your
>                     customAggregateFunction, you can implement it
>                     using a ProcessFuntion.
>                     This issue is faced by many developers.
>                     So, someone must have implemented or tried to
>                     implement it. So, kindly share
>                     your feedback on this.
>                     As I need to implement this.
>
>                     Thanks & Regards
>                     Gaurav Luthra
>
>
>
>                     --
>                     Sent from:
>                     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Mime
View raw message