flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Flink Time Window State
Date Fri, 04 Nov 2016 17:54:54 GMT
Hi,
the state of the window is kept by the WindowOperator (which uses the state
descriptor you mentioned to access the state). The FoldFunction does not
itself keep the state but is only used to update the state inside the
WindowOperator, if you will.

When you say restart, are you talking about stopping the job manually and
then restarting? In that case I expect the state to be reset. Flink will
perform checkpoints of the state so that it can recover in case of
failures, these checkpoints, however, don't survive stopping a job. If you
want to persist the state across stopping/restarting you should look into
save points:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html

Cheers,
Aljoscha

On Fri, 4 Nov 2016 at 16:40 Daniel Santos <dsantos@cryptolab.net> wrote:

>
> Hello Aljoscha,
>
> Thank you for your reply.
>
> But I believe, reading from the docs, that any user function has to be a
> Rich Function, if it wishes to have state.
> Now any Rich Function cannot be used or accepted on a Window.
>
> For instances looking at flink source version 1.1.3 which is the one I'm
> currently using, on the class WindowedStream.java we find the following
> snippet.
>
> "
>     public <R> SingleOutputStreamOperator<R> apply(R initialValue,
> FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
> TypeInformation<R> resultType) {
>         if (foldFunction instanceof RichFunction) {
>             throw new UnsupportedOperationException("FoldFunction of apply
> can not be a RichFunction.");
>         }
>         if (windowAssigner instanceof MergingWindowAssigner) {
>             throw new UnsupportedOperationException("Fold cannot be used
> with a merging WindowAssigner.");
>         }
> ...
> "
>
> Now I can see that window operator creates a FoldDescriptor, as you have
> said it uses the APIs you have described.
> However I can't see how everything fits.
> For instances the Count class here described which can only extend a
> FoldFunction and not a RichFoldFunction, how does flink keeps track of the
> accumulator ?
> Because from my tests it seems that it does not.
>
> Everytime the program/stream/job is restart the accumulator start from the
> Initial Value.
>
> Kind Regards,
> Daniel Santos
>
>
> On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
>
> Hi Daniel,
> Flink will checkpoint the state of all operations (in your case to HDFS).
> Flink has several APIs for dealing with state in user functions:
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The
> window operator also internally uses these APIs.
>
> Let me know if you need anything more.
>
> Cheers,
> Aljoscha
>
> On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsantos@cryptolab.net> wrote:
>
> Hello,
>
> I have some question that has been bugging me.
> Let's say we have a Kafka Source.
> Checkpoint is enabled, with a period of 5 seconds.
> We have a FSBackend ( Hadoop ).
>
> Now imagine we have a window a tumbling of 10 Minutes.
>
> For simplicity we are going to say that we are counting all elements
> arrinving in 10 Minutes. Something like this.
>
> class Count extends FoldFunction[Event, Long] {
>
>     override def fold(accumulator: Long, value: Event): Long =  {
>       accumulator + 1
>     }
>
> }
>
> So we have
>
> source.
>       window(<Tumbling>).
>       apply(0, Count(), WindowFunction())
>
> In the first 2 Minutes arrives 10 events, then we stop the
> stream/task/job or it fails and then it is restarted, what will be the
> state of the fold function ?
> Will it be 10 and it will resume from there ? Or will it be 0 ?
>
> It is kinda important to know because imagine we have a Window of 1 day.
> And the job fails mid day. How will it resume ?
>
> Best Regards
>
>
>

Mime
View raw message