flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniel Santos <dsan...@cryptolab.net>
Subject Re: Flink Time Window State
Date Fri, 04 Nov 2016 15:39:40 GMT

Hello Aljoscha,

Thank you for your reply.

But I believe, reading from the docs, that any user function has to be a 
Rich Function, if it wishes to have state.
Now any Rich Function cannot be used or accepted on a Window.

For instances looking at flink source version 1.1.3 which is the one I'm 
currently using, on the class WindowedStream.java we find the following 
snippet.

"
     public <R> SingleOutputStreamOperator<R> apply(R initialValue, 
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, 
TypeInformation<R> resultType) {
         if (foldFunction instanceof RichFunction) {
             throw new UnsupportedOperationException("FoldFunction of 
apply can not be a RichFunction.");
         }
         if (windowAssigner instanceof MergingWindowAssigner) {
             throw new UnsupportedOperationException("Fold cannot be 
used with a merging WindowAssigner.");
         }
...
"

Now I can see that window operator creates a FoldDescriptor, as you have 
said it uses the APIs you have described.
However I can't see how everything fits.
For instances the Count class here described which can only extend a 
FoldFunction and not a RichFoldFunction, how does flink keeps track of 
the accumulator ?
Because from my tests it seems that it does not.

Everytime the program/stream/job is restart the accumulator start from 
the Initial Value.

Kind Regards,
Daniel Santos


On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
> Hi Daniel,
> Flink will checkpoint the state of all operations (in your case to 
> HDFS). Flink has several APIs for dealing with state in user 
> functions: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The 
> window operator also internally uses these APIs.
>
> Let me know if you need anything more.
>
> Cheers,
> Aljoscha
>
> On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsantos@cryptolab.net 
> <mailto:dsantos@cryptolab.net>> wrote:
>
>     Hello,
>
>     I have some question that has been bugging me.
>     Let's say we have a Kafka Source.
>     Checkpoint is enabled, with a period of 5 seconds.
>     We have a FSBackend ( Hadoop ).
>
>     Now imagine we have a window a tumbling of 10 Minutes.
>
>     For simplicity we are going to say that we are counting all elements
>     arrinving in 10 Minutes. Something like this.
>
>     class Count extends FoldFunction[Event, Long] {
>
>         override def fold(accumulator: Long, value: Event): Long =  {
>           accumulator + 1
>         }
>
>     }
>
>     So we have
>
>     source.
>           window(<Tumbling>).
>           apply(0, Count(), WindowFunction())
>
>     In the first 2 Minutes arrives 10 events, then we stop the
>     stream/task/job or it fails and then it is restarted, what will be the
>     state of the fold function ?
>     Will it be 10 and it will resume from there ? Or will it be 0 ?
>
>     It is kinda important to know because imagine we have a Window of
>     1 day.
>     And the job fails mid day. How will it resume ?
>
>     Best Regards
>


Mime
View raw message