flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Lam <paullin3...@gmail.com>
Subject Re: Preserve accumulators after failure in DataStream API
Date Thu, 02 May 2019 08:25:35 GMT
Hi Wouter,

I've met the same issue and finally managed to use operator states to back
the accumulators, so they can be restored after restarts.
The downside is that we have to update the values in both accumulators and
states to make them consistent. FYI.

Best,
Paul Lam

Fabian Hueske <fhueske@gmail.com> 于2019年5月2日周四 下午4:17写道:

> Hi Wouter,
>
> OK, that explains it :-) Overloaded terms...
>
> The Table API / SQL documentation refers to the accumulator of an
> AggregateFunction [1].
> The accumulators that are accessible via the RuntimeContext are a rather
> old part of the API that is mainly intended for batch jobs.
>
> I would not use them for streaming applications as they are not
> checkpointed and recovered (as you noticed).
> You should use managed state (keyed or operator) for such use cases.
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
>
> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager <
> W.D.Zorgdrager@tudelft.nl>:
>
>> Hi Fabian,
>>
>> Maybe I should clarify a bit, actually I'm using a (Long)Counter
>> registered as Accumulator in the RuntimeContext [1]. So I'm using a
>> KeyedProcessFunction, not an AggregateFunction. This works property, but is
>> not retained after a job restart. I'm not entirely sure if I did this
>> correct.
>>
>> Thx,
>> Wouter
>>
>>
>> [1].
>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34
>>
>>
>>
>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske <fhueske@gmail.com>:
>>
>>> Hi Wouter,
>>>
>>> The DataStream API accumulators of the AggregateFunction [1] are stored
>>> in state and should be recovered in case of a failure as well.
>>> If this does not work, it would be a serious bug.
>>>
>>> What's the type of your accumulator?
>>> Can you maybe share the code?
>>> How to you apply the AggregateFunction (window, windowAll, ...)?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>
>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager <
>>> W.D.Zorgdrager@tudelft.nl>:
>>>
>>>> Hi all,
>>>>
>>>> In the documentation I read about UDF accumulators [1] "Accumulators
>>>> are automatically backup-ed by Flink’s checkpointing mechanism and restored
>>>> in case of a failure to ensure exactly-once semantics." So I assumed
>>>> this also was the case of accumulators used in the DataStream API, but I
>>>> noticed that it isn't. So every time my jobs crashes and restarts, the
>>>> accumulator is reset. Is there a way to retain this information?
>>>>
>>>> Thanks,
>>>> Wouter
>>>>
>>>>
>>>> [1].
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>>>>
>>>

Mime
View raw message