flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ron Crocker <rcroc...@newrelic.com>
Subject Re: FoldFunction accumulator checkpointing
Date Tue, 19 Apr 2016 16:53:08 GMT
Aljoscha -

I want to use a RichFoldFunction to get the open() hook. I cheat and use
this structure instead with a (non-Rich) FoldFunction:

public class InfinitResponseFilterFolder implements
FoldFunction<Tuple2<Long, Long>, String> {
    private BackingStore backingStore;
    public String fold(InfiniteResponseFilter accumulator, Tuple2<Long,
Long> value) throws Exception {
        if (backingStore == null) { // running the open() hook if necessary
        if (accumulator == null) {
            accumulator = new InfiniteResponseFilter(backingStore, value);
        return accumulator.incrementFilter(value)

    private void initializeBackingStore() {
        // connect to database
        backingStore = ...

Note that what I want to do is connect to a backing store to read the
initial state for a fold operation. The particular operation I’m trying to
do is a form of an infinite response filter in the form of a triple
exponential smoother
the various coefficients for a start state are pre-calculated (and stored
in that BackingStore).

Further, I want to checkpoint the entire state (including the coefficients)
to both Flink’s checkpointing system as well as the backing store. The
former is handled here, the latter is handled with another transform in my

Is there a better approach?

Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835

> On Apr 13, 2016, at 1:25 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:

> Hi,
> there are two cases where a FoldFunction can be used in the streaming
API: KeyedStream.fold() and WindowedStream.fold()/apply(). In both cases we
internally use the partitioned state abstraction of Flink to keep the
state. So yes, the accumulator value is consistently maintained and will
survive failures.
> Right now, the accumulation function of a window cannot be a rich
function because the underlying state primitives that the windowing system
uses can only take plain functions because supporting rich functions there
could have problematic implications. The most obvious one to me seems that
users could be trying to keep state in the ReduceFunction of a
ReducingState when given the chance to do so, which a RichFunction does.
> This is just off the top of my head but I can go into detail if you want.
> Cheers,
> Aljoscha
> On Wed, 13 Apr 2016 at 00:29 Michael Radford <mubber@gmail.com> wrote:


>> I'm wondering whether the accumulator value maintained by a
>> FoldFunction is automatically checkpointed?
>> In general, but specifically when using the WindowedStream.apply
>> variant that takes a FoldFunction:
>> public <R> DataStream<R> apply(R initialValue,
>>                       FoldFunction<T,R> foldFunction,
>>                       WindowFunction<R,R,K,W> function,
>>                       TypeInformation<R> evidence$7)
>> If not, then Flink 1.0.1 still has the issue that you can't pass a
>> RichFoldFunction to WindowedStream.apply
>> (java.lang.UnsupportedOperationException: ReduceFunction of apply can
>> not be a RichFunction).
>> But also, if not, it seems like this would be a common pattern when
>> doing complex (keyed / multi-valued) aggregations, and if the
>> accumulator type R is serializable, there could be a convenience
>> method for a checkpointed fold, like the mapWithState mentioned in the
>> State section of the streaming guide.
>> Thanks,
>> Mike

View raw message