flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5529) Improve / extends windowing documentation
Date Wed, 25 Jan 2017 17:32:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838207#comment-15838207

ASF GitHub Bot commented on FLINK-5529:

Github user aljoscha commented on a diff in the pull request:

    --- Diff: docs/dev/windows.md ---
    @@ -622,133 +690,138 @@ input
    -## Dealing with Late Data
    +## Triggers
    -When working with event-time windowing it can happen that elements arrive late, i.e the
    -watermark that Flink uses to keep track of the progress of event-time is already past
    -end timestamp of a window to which an element belongs. Please
    -see [event time](./event_time.html) and especially
    -[late elements](./event_time.html#late-elements) for a more thorough discussion of
    -how Flink deals with event time.
    +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to
    +processed by the *window function*. Each `WindowAssigner` comes with a default `Trigger`.

    +If the default trigger does not fit your needs, you can specify a custom trigger using
    -You can specify how a windowed transformation should deal with late elements and how
much lateness
    -is allowed. The parameter for this is called *allowed lateness*. This specifies by how
much time
    -elements can be late. Elements that arrive within the allowed lateness are still put
into windows
    -and are considered when computing window results. If elements arrive after the allowed
lateness they
    -will be dropped. Flink will also make sure that any state held by the windowing operation
is garbage
    -collected once the watermark passes the end of a window plus the allowed lateness.
    +The trigger interface provides five methods that react to different events: 
    -<span class="label label-info">Default</span> By default, the allowed lateness
is set to
    -`0`. That is, elements that arrive behind the watermark will be dropped.
    +* The `onElement()` method is called for each element that is added to a window. 
    +* The `onEventTime()` method is called when  a registered event-time timer fires. 
    +* The `onProcessingTime()` method is called when a registered processing-time timer fires.

    +* The `onMerge()` method is relevant for stateful triggers and merges the states of two
triggers when their corresponding windows merge, *e.g.* when using session windows. 
    +* Finally the `clear()` method performs any action needed upon removal of the corresponding
    -You can specify an allowed lateness like this:
    +Any of these methods can be used to register processing- or event-time timers for future
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +### Fire and Purge
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .allowedLateness(<time>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    +Once a trigger determines that a window is ready for processing, it fires. This is the
signal for the window operator to emit the result of the current window. Given a window with
a `WindowFunction` 
    +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).

    +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the
contents of the window, `FIRE_AND_PURGE` removes its content.
    +By default, the pre-implemented triggers simply `FIRE` without purging the window state.
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .allowedLateness(<time>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    +<span class="label label-danger">Attention</span> When purging, only the
contents of the window are cleared. The window itself is not removed and accepts new elements.
    -<span class="label label-info">Note</span> When using the `GlobalWindows`
window assigner no
    -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
    +### Default Triggers of WindowAssigners
    -## Triggers
    +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example,
all the event-time window assigners have an `EventTimeTrigger` as
    +default trigger. This trigger simply fires once the watermark passes the end of a window.

    -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for
    -processed by the *window function*. The trigger observes how elements are added to windows
    -and can also keep track of the progress of processing time and event time. Once a trigger
    -determines that a window is ready for processing, it fires. This is the signal for the
    -window operation to take the elements that are currently in the window and pass them
along to
    -the window function to produce output for the firing window.
    +<span class="label label-danger">Attention</span> The default trigger of
the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have
to define a custom trigger when using a `GlobalWindow`.
    -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should
    -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger`
    -default trigger. This trigger simply fires once the watermark passes the end of a window.
    +<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
    +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify
    +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based
on the
    +progress of time but only by count. Right now, you have to write your own custom trigger
    +you want to react based on both time and count.
    -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`.
    -whole specification of the windowed transformation would then look like this:
    +### Built-in and Custom Triggers
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +Flink comes with a few built-in triggers. 
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .trigger(<trigger>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time
as measured by watermarks. 
    +* The `ProcessingTimeTrigger` fires based on processing time. 
    +* The `CountTrigger` which fires once the number of elements in a window exceeds the
given limit.
    --- End diff --
    None of the other Triggers have a "which" here.

> Improve / extends windowing documentation
> -----------------------------------------
>                 Key: FLINK-5529
>                 URL: https://issues.apache.org/jira/browse/FLINK-5529
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.3.0
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
>   stream
>      [.keyBy(...)]         <-  keyed versus non-keyed windows
>       .window(...)         <-  required: "assigner"
>      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
>      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
>      [.allowedLateness()]  <-  optional, else zero
>       .reduce/fold/apply() <-  required: "function"
> (1) Types of windows
>   - tumble
>   - slide
>   - session
>   - global
> (2) Pre-defined windows
>    timeWindow() (tumble, slide)
>    countWindow() (tumble, slide)
>      - mention that count windows are inherently
>        resource leaky unless limited key space
> (3) Window Functions
>   - apply: most basic, iterates over elements in window
>   - aggregating: reduce and fold, can be used with "apply()" which will get one element
>   - forward reference to state size section
> (4) Advanced Windows
>   - assigner
>     - simple
>     - merging
>   - trigger
>     - registering timers (processing time, event time)
>     - state in triggers
>   - life cycle of a window
>     - create
>     - state
>     - cleanup
>       - when is window contents purged
>       - when is state dropped
>       - when is metadata (like merging set) dropped
> (5) Late data
>   - picture
>   - fire vs fire_and_purge: late accumulates vs late resurrects (cf discarding mode)
> (6) Evictors
>   - TDB
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
>   --> num windows * num elements in window
>   --> example: tumbline is one copy, sliding(n,m) is n/m copies
>   --> per key
> Pre-aggregation:
>   - if reduce or fold is set -> one element per window (rather than num elements in
>   - evictor voids pre-aggregation from the perspective of state
> Special rules:
>   - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
>   - all elements through the same windows
>   - currently not parallel
>   - possible parallel in the future when having pre-aggregation functions
>   - inherently (by definition) produce a result stream with parallelism one
>   - state similar to one key of keyed windows
> {code}

This message was sent by Atlassian JIRA

View raw message