flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5529) Improve / extends windowing documentation
Date Wed, 25 Jan 2017 17:32:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838199#comment-15838199
] 

ASF GitHub Bot commented on FLINK-5529:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3191#discussion_r97824676
  
    --- Diff: docs/dev/windows.md ---
    @@ -23,133 +23,96 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    -Flink uses a concept called *windows* to divide a (potentially) infinite `DataStream`
into finite
    -slices based on the timestamps of elements or other criteria. This division is required
when working
    -with infinite streams of data and performing transformations that aggregate elements.
    -
    -<span class="label label-info">Info</span> We will mostly talk about *keyed
windowing* here, i.e.
    -windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements
are
    -subdivided based on both window and key before being given to
    -a user function. The work can thus be distributed across the cluster
    -because the elements for different keys can be processed independently. If you absolutely
have to,
    -you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed
    -windows work.
    +Windows are at the heart of processing infinite streams. Windows split the stream into
"buckets" of finite size, 
    +over which we can apply computations. This document focuses on how windowing is performed
in Flink and how the 
    +programmer can benefit to the maximum from its offered functionality. 
     
    -* This will be replaced by the TOC
    -{:toc}
    +The general structure of a windowed Flink program is presented below. This is also going
to serve as a roadmap for 
    +the rest of the page.
     
    -## Basics
    +    stream
    +           .keyBy(...)          <-  keyed versus non-keyed windows
    +           .window(...)         <-  required: "assigner"
    +          [.trigger(...)]       <-  optional: "trigger" (else default trigger)
    +          [.evictor(...)]       <-  optional: "evictor" (else no evictor)
    +          [.allowedLateness()]  <-  optional, else zero
    +           .reduce/fold/apply() <-  required: "function"
     
    -For a windowed transformation you must at least specify a *key*
    -(see [specifying keys]({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)),
    -a *window assigner* and a *window function*. The *key* divides the infinite, non-keyed,
stream
    -into logical keyed streams while the *window assigner* assigns elements to finite per-key
windows.
    -Finally, the *window function* is used to process the elements of each window.
    +In the above, the commands in square brackets ([...]) are optional. This reveals that
Flink allows you to customize your 
    +windowing logic in many different ways so that it best fits your needs. 
     
    -The basic structure of a windowed transformation is thus as follows:
    +* This will be replaced by the TOC
    +{:toc}
     
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +## Window Lifecycle
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    -</div>
    +In a nutshell, a window is **created** as soon as the first element that should belong
to this window arrives, and the  
    +window is **completely removed** when the time (event or processing time) passes its
end timestamp plus the user-specified 
    +`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink guarantees removal
only for time-based 
    +windows and not for other types, *e.g.* global windows (see [Window Assigners](#window-assigners)).
For example, with an 
    +event-time-based windowing strategy that creates non-overlapping (or tumbling) windows
every 5 minutes and has an allowed 
    +lateness of 1 min, Flink will create a new window for the interval between `12:00` and
`12:05` when the first element with 
    +a timestamp that falls into this interval arrives, and it will remove it when the watermark
passes the `12:06`
    +timestamp. 
     
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function
(`WindowFunction`, `ReduceFunction` or 
    +`FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function
will contain the computation to 
    +be applied to the contents of the window, while the `Trigger` specifies the conditions
under which the window is 
    +considered ready for the function to be applied. A triggering policy might be something
like "when the number of elements 
    +in the window is more than 4", or "when the watermark passes the end of the window".
A trigger can also decide to 
    +purge a window's contents any time between its creation and removal. Purging in this
case only refers to the elements 
    +in the window, and *not* the window metadata. This means that new data can still be added
to that window.
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    -</div>
    -</div>
    +Apart from the above, you can specify an `Evictor` (see [Evictors](#evictors)) which
will be able to remove  
    +elements from the window after the trigger fires and before and/or after the function
is applied.
     
    -We will cover [window assigners](#window-assigners) in a separate section below.
    +In the following we go into more detail for each of the components above. We start with
the required parts in the above 
    +snippet (see [Keyed vs Non-Keyed Windows](#keyed-vs-non-keyed-windows), [Window Assigner](#window-assigner),
and 
    +[Window Function](#window-function)) before moving to the optional ones.
     
    -The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively
    -takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these
ways
    -of specifying a windowed transformation in detail below: [window functions](#window-functions).
    +## Keyed vs Non-Keyed Windows
     
    -For more advanced use cases you can also specify a `Trigger` that determines when exactly
a window
    -is being considered as *ready for processing*. These will be covered in more detail in
    -[triggers](#triggers).
    +The first thing to specify is whether your stream should be keyed or not. This has to
be done before defining the window. 
    +Using the `keyBy(...)` will split your infinite stream into logical keyed streams. If
`keyBy(...)` is not called, your 
    +stream is not keyed.
     
    -## Window Assigners
    +In the case of keyed streams, any attribute of your incoming events can be used as a
key 
    +(more details [here]({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)). Having
a keyed stream will 
    +allow your windowed computation to be performed in parallel by multiple tasks, as each
logical keyed stream can be processed 
    +independently from the rest. All elements referring to the same key will be sent to the
same parallel task. 
     
    -The window assigner specifies how elements of the stream are divided into finite slices.
Flink comes
    -with pre-implemented window assigners for the most typical use cases, namely *tumbling
windows*,
    -*sliding windows*, *session windows* and *global windows*, but you can implement your
own by
    -extending the `WindowAssigner` class. All the built-in window assigners, except for the
global
    -windows one, assign elements to windows based on time, which can either be processing
time or event
    -time. Please take a look at our section on [event time]({{ site.baseurl }}/dev/event_time.html)
for more
    -information about how Flink deals with time.
    +In case of non-keyed streams, your original stream will not be split into multiple logical
streams and all the windowing logic 
    +will be performed by a single task, *i.e.* with parallelism of 1.
     
    -Let's first look at how each of these window assigners works before looking at how they
can be used
    -in a Flink program. We will be using abstract figures to visualize the workings of each
assigner:
    -in the following, the purple circles are elements of the stream, they are partitioned
    -by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis shows the progress
    -of time.
    +## Window Assigners
     
    -### Global Windows
    +After specifying whether your stream is keyed or not, the next step is to define a *windowing
strategy*. 
    --- End diff --
    
    I think we should stick to `window assigner` here because that's what we're describing.
In my mind, the ensemble of window assigner, trigger (and evictor) is actually the `windowing
strategy` since only those together define what happens in the end.
    
    What do you think? 


> Improve / extends windowing documentation
> -----------------------------------------
>
>                 Key: FLINK-5529
>                 URL: https://issues.apache.org/jira/browse/FLINK-5529
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
>   stream
>      [.keyBy(...)]         <-  keyed versus non-keyed windows
>       .window(...)         <-  required: "assigner"
>      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
>      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
>      [.allowedLateness()]  <-  optional, else zero
>       .reduce/fold/apply() <-  required: "function"
> (1) Types of windows
>   - tumble
>   - slide
>   - session
>   - global
> (2) Pre-defined windows
>    timeWindow() (tumble, slide)
>    countWindow() (tumble, slide)
>      - mention that count windows are inherently
>        resource leaky unless limited key space
> (3) Window Functions
>   - apply: most basic, iterates over elements in window
>   
>   - aggregating: reduce and fold, can be used with "apply()" which will get one element
>   
>   - forward reference to state size section
> (4) Advanced Windows
>   - assigner
>     - simple
>     - merging
>   - trigger
>     - registering timers (processing time, event time)
>     - state in triggers
>   - life cycle of a window
>     - create
>     - state
>     - cleanup
>       - when is window contents purged
>       - when is state dropped
>       - when is metadata (like merging set) dropped
> (5) Late data
>   - picture
>   - fire vs fire_and_purge: late accumulates vs late resurrects (cf discarding mode)
>   
> (6) Evictors
>   - TDB
>   
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
>   --> num windows * num elements in window
>   --> example: tumbline is one copy, sliding(n,m) is n/m copies
>   --> per key
> Pre-aggregation:
>   - if reduce or fold is set -> one element per window (rather than num elements in
window)
>   - evictor voids pre-aggregation from the perspective of state
> Special rules:
>   - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
>   - all elements through the same windows
>   - currently not parallel
>   - possible parallel in the future when having pre-aggregation functions
>   - inherently (by definition) produce a result stream with parallelism one
>   - state similar to one key of keyed windows
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message