flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Flink message & state lifecycle.
Date Fri, 15 Jan 2016 10:12:25 GMT
Hi,
I imagine you are taking about CountTrigger, DeltaTrigger, and Continuous*Trigger. For these
we never purge. They are a leftover artifact from an earlier approach to implementing windowing
strategies that was inspired by IBM InfoSphere streams. Here, all triggers are essentially
accumulating and elements are evicted by an evictor. This is very flexible but makes it hard
to implement windowing code efficiently. If you are interested here is a Master Thesis that
describes that earlier implementation: http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf

These triggers are problematic because they never purge window contents if you don’t have
an evictor that does correct eviction. Also, they don’t allow incremental aggregation over
elements as they arrive since you don’t know what will be the contents of the window until
the trigger fires and the evictor evicts.

So, as a short answer: the accumulating triggers never purge window state on their own. I
hope this helps somehow.

Cheers,
Aljoscha
> On 15 Jan 2016, at 09:55, Andrew Coates <big.andy.coates@gmail.com> wrote:
> 
> Thanks Aljoscha, that's very enlightening.
> 
> Can you please also explain what the default behaviour is? I.e. if I use one if the accumulating
inbuilt triggers, when does the state get purged? (With your info I can now probably work
things out, but you may give more insight :)
> 
> Also, are there plans to add explicit lateness control to flink core?  (I'm aware off
the dataflow integration work )
> 
> Thanks again,
> 
> Andy
> 
> 
> On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek <aljoscha@apache.org> wrote:
> Hi,
> the window contents are stored in state managed by the window operator at all times until
they are purged by a Trigger returning PURGE from one of its on*() methods.
> 
> Out of the box, Flink does not have something akin to the lateness and cleanup of Google
Dataflow. You can, however implement it yourself using a custom Trigger. This is an example
that mimics Google Dataflow:
> 
> public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
>    private static final long serialVersionUID = 1L;
> 
>    private final boolean accumulating;
>    private final long allowedLateness;
> 
>    private EventTimeTrigger(boolean accumulating, long allowedLateness) {
>       this.accumulating = accumulating;
>       this.allowedLateness = allowedLateness;
>    }
> 
>    @Override
>    public TriggerResult onElement(Object element, long timestamp, TimeWindow window,
TriggerContext ctx) throws Exception {
>       ctx.registerEventTimeTimer(window.maxTimestamp());
>       return TriggerResult.CONTINUE;
>    }
> 
>    @Override
>    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
{
>       if (time == window.maxTimestamp()) {
>          if (accumulating) {
>             // register the cleanup timer if we are accumulating (and allow lateness)
>             if (allowedLateness > 0) {
>                ctx.registerEventTimeTimer(window.maxTimestamp() + allowedLateness);
>             }
>             return TriggerResult.FIRE;
>          } else {
>             return TriggerResult.FIRE_AND_PURGE;
>          }
>       } else if (time == window.maxTimestamp() + allowedLateness) {
>          return TriggerResult.PURGE;
>       }
> 
>       return TriggerResult.CONTINUE;
>    }
> 
>    @Override
>    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext
ctx) throws Exception {
>       return TriggerResult.CONTINUE;
>    }
> 
>    @Override
>    public String toString() {
>       return "EventTimeTrigger()";
>    }
> 
>    /**
>     * Creates an event-time trigger that fires once the watermark passes the end of the
window.
>     *
>     * <p>
>     * Once the trigger fires all elements are discarded. Elements that arrive late immediately
>     * trigger window evaluation with just this one element.
>     */
>    public static EventTimeTrigger discarding() {
>       return new EventTimeTrigger(false, 0L);
>    }
> 
>    /**
>     * Creates an event-time trigger that fires once the watermark passes the end of the
window.
>     *
>     * <p>
>     * This trigger will not immediately discard all elements once it fires. Only after
the
>     * watermark passes the specified lateness are the window elements discarded, without
>     * emitting a new result. If a late element arrives within the specified lateness
>     * the window is computed again and a new result is emitted.
>     */
>    public static EventTimeTrigger accumulating(AbstractTime allowedLateness) {
>       return new EventTimeTrigger(true, allowedLateness.toMilliseconds());
>    }
> }
> 
> You can specify a lateness and while that time is not yet reached the windows will remain
and late arriving elements will trigger window emission with the complete window contents.
> 
> Cheers,
> Aljoscha
> > On 13 Jan 2016, at 15:12, Andrew Coates <big.andy.coates@gmail.com> wrote:
> >
> > Hi,
> >
> > I'm trying to understand how the lifecycle of messages / state is managed by Flink,
but I'm failing to find any documentation.
> >
> > Specially, if I'm using a windowed stream and a type of trigger that retain the
elements of the window to allow for processing of late data e.g. ContinousEventTimeTrigger,
then where are the contents of the windows, or their intermediate computation results, stored,
and when is the data removed?
> >
> > I'm thinking in terms of Google's Dataflow API, setting a windows the withAllowedLateness
option allows the caller to control how long past the end of a window the data should be maintained.
 Does Flink have anything similar?
> >
> > Thanks,
> >
> > Andy
> 


Mime
View raw message