flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/7] flink git commit: [FLINK-4752] [docs] Improve window assigner documentation.
Date Fri, 27 Jan 2017 12:37:34 GMT
[FLINK-4752] [docs] Improve window assigner documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d398f345
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d398f345
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d398f345

Branch: refs/heads/master
Commit: d398f3451d241920c74164f859ed53055bce4035
Parents: c30c86b
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Jan 27 11:48:26 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Jan 27 13:30:24 2017 +0100

----------------------------------------------------------------------
 docs/dev/windows.md | 299 +++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 266 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d398f345/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index 5752ef2..82906a0 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -97,39 +97,272 @@ independently from the rest. All elements referring to the same key will
be sent
 In case of non-keyed streams, your original stream will not be split into multiple logical
streams and all the windowing logic 
 will be performed by a single task, *i.e.* with parallelism of 1.
 
-## Window Assigner
-
-After specifying whether your stream is keyed or not, the next step is to specify a *windowing
strategy*. 
-This will dictate how your elements will be assigned into windows and the general way to
do so, is by specifying the 
-`WindowAssigner` that corresponds to the windowing strategy of your choice in the `window(...)`
call. 
-
-The `WindowAssigner` is responsible for assigning each incoming element to one or more windows.
Flink comes with 
-pre-implemented window assigners for the most typical use cases, namely *tumbling windows*,
*sliding windows*, 
-*session windows* and *global windows*, but you can implement your own by extending the `WindowAssigner`
class. All the 
-built-in window assigners, except for the *global windows* one, assign elements to windows
based on time, which can 
-either be *processing* time or *event* time. A list of the pre-implemented `WindowAssigners`
follows:
-
-* **TumblingTimeWindows**: non-overlapping windows of a user-specified *duration* `d`. Each
element belongs to a single 
-            window based on its timestamp. As an example, an element with timestamp 12.01
and a tumbling windowing 
-            strategy with `d = 5 min` will be assigned to the window `[start=12.00, end=12.05]`.
-
-* **SlidingTimeWindows**: overlapping windows of a user-specified *duration* `d` and a *slide*
`s`. Each element belongs to 
-            a `d/s` windows based on its timestamp. Our element with timestamp 12.01 and
a sliding windowing 
-            strategy with `d = 5 min` and `s = 1 min` will be assigned to the windows `[start=12.00,
end=12.05]`, 
-            `[start=12.01, end=12.06]`, `[start=12.02, end=12.07]`, `[start=12.03, end=12.08]`,
`[start=12.04, end=12.09]`.
-
-* **TimeSessionWindows**: contrary to the previous assigners, here the window boundaries
depend on the incoming data. The user 
-            specifies a gap `g` and if there is a period of inactivity for the stream of
more than `g` time units, then the 
-            open window closes and a new, clean one opens to receive new events. In case
we are operating in `event time`, two 
-            consecutive session windows can be *merged* if an element arrives and makes the
gap between the two windows less than 
-            `g`. This is the reason why session windows belong to the category of `MergingWindowAssigners`.
To illustrate
-            the latter, if `g = 5` and we have two windows `w1=[start=12.00, end=12.05]`
and `w2=[start=12.10, end=12.15]`,
-            then if element `e` arrives with timestamp `t=12.03`, the two previous windows
will be merged into one, 
-            `w3=[start=12.00, end=12.15]`.
-
-* **GlobalWindows**: this is a way of specifying that we don’t want to subdivide our elements
into windows. All elements are 
-            assigned to the same per-key global window. This implies that this strategy is
only meaningful when combined 
-            with a custom trigger, *e.g.* a `CountTrigger`, which will tell the window when
to fire.
+## Window Assigners
+
+After specifying whether your stream is keyed or not, the next step is to define a *window
assigner*. 
+The window assigner defines how elements are assigned to windows. This is done by specifying
the `WindowAssigner` 
+of your choice in the `window(...)` (for *keyed* streams) or the `windowAll()` (for *non-keyed*
streams) call. 
+
+A `WindowAssigner` is responsible for assigning each incoming element to one or more windows.
Flink comes
+with pre-defined window assigners for the most common use cases, namely *tumbling windows*,
+*sliding windows*, *session windows* and *global windows*. You can also implement a custom
window assigner by
+extending the `WindowAssigner` class. All built-in window assigners (except the global
+windows) assign elements to windows based on time, which can either be processing time or
event
+time. Please take a look at our section on [event time]({{ site.baseurl }}/dev/event_time.html)
to learn 
+about the difference between processing time and event time and how timestamps and watermarks
are generated.
+
+In the following, we show how Flink's pre-defined window assigners work and how they are
used
+in a DataStream program. The following figures visualize the workings of each assigner. The
purple circles 
+represent elements of the stream, which are partitioned by some key (in this case *user 1*,
*user 2* and *user 3*). 
+The x-axis shows the progress of time.
+
+### Tumbling Windows
+
+A *tumbling windows* assigner assigns each element to a window of a specified *window size*.

+Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling

+window with a size of 5 minutes, the current window will be evaluated and a new window will
be 
+started every five minutes as illustrated by the following figure.
+
+<img src="{{ site.baseurl }}/fig/tumbling-windows.svg" class="center" style="width: 80%;"
/>
+
+The following code snippets show how to use tumbling windows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
+
+// tumbling event-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// tumbling processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// daily tumbling event-time windows offset by -8 hours.
+input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
+
+// tumbling event-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// tumbling processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// daily tumbling event-time windows offset by -8 hours.
+input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
+</div>
+</div>
+
+Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`,
+`Time.minutes(x)`, and so on.
+
+As shown in the last example, tumbling window assigners also take an optional `offset` 
+parameter that can be used to change the alignment of windows. For example, without offsets

+hourly tumbling windows are aligned with epoch, that is you will get windows such as 
+`1:00:00.000 - 1:59:59.999`, `2:00:00.000 - 2:59:59.999` and so on. If you want to change

+that you can give an offset. With an offset of 15 minutes you would, for example, get 
+`1:15:00.000 - 2:14:59.999`, `2:15:00.000 - 3:14:59.999` etc. 
+An important use case for offsets is to adjust windows to timezones other than UTC-0. 
+For example, in China you would have to specify an offset of `Time.hours(-8)`.
+
+### Sliding Windows
+
+The *sliding windows* assigner assigns elements to windows of fixed length. Similar to a
tumbling 
+windows assigner, the size of the windows is configured by the *window size* parameter. 
+An additional *window slide* parameter controls how frequently a sliding window is started.
Hence, 
+sliding windows can be overlapping if the slide is smaller than the window size. In this
case elements 
+are assigned to multiple windows.
+
+For example, you could have windows of size 10 minutes that slides by 5 minutes. With this
you get every
+5 minutes a window that contains the events that arrived during the last 10 minutes as depicted
by the 
+following figure.
+
+<img src="{{ site.baseurl }}/fig/sliding-windows.svg" class="center" style="width: 80%;"
/>
+
+The following code snippets show how to use sliding windows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
+
+// sliding event-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// sliding processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// sliding processing-time windows offset by -8 hours
+input
+    .keyBy(<key selector>)
+    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
+
+// sliding event-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// sliding processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// sliding processing-time windows offset by -8 hours
+input
+    .keyBy(<key selector>)
+    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
+</div>
+</div>
+
+Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`,
+`Time.minutes(x)`, and so on.
+
+As shown in the last example, sliding window assigners also take an optional `offset` parameter

+that can be used to change the alignment of windows. For example, without offsets hourly
windows 
+sliding by 30 minutes are aligned with epoch, that is you will get windows such as 
+`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change
that 
+you can give an offset. With an offset of 15 minutes you would, for example, get 
+`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. 
+An important use case for offsets is to adjust windows to timezones other than UTC-0. 
+For example, in China you would have to specify an offset of `Time.hours(-8)`.
+
+### Session Windows
+
+The *session windows* assigner groups elements by sessions of activity. Session windows do
not overlap and
+do not have a fixed start and end time, in contrast to *tumbling windows* and *sliding windows*.
Instead a 
+session window closes when it does not receive elements for a certain period of time, *i.e.*,
when a gap of 
+inactivity occurred. A session window assigner is configured with the *session gap* which
+defines how long is the required period of inactivity. When this period expires, the current
session closes 
+and subsequent elements are assigned to a new session window.
+
+<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 80%;"
/>
+
+The following code snippets show how to use session windows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
+
+// event-time session windows
+input
+    .keyBy(<key selector>)
+    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>);
+
+// processing-time session windows
+input
+    .keyBy(<key selector>)
+    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
+
+// event-time session windows
+input
+    .keyBy(<key selector>)
+    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>)
+
+// processing-time session windows
+input
+    .keyBy(<key selector>)
+    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
+</div>
+</div>
+
+Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`,
+`Time.minutes(x)`, and so on.
+
+<span class="label label-danger">Attention</span> Since session windows do not
have a fixed start and end, 
+they are  evaluated differently than tumbling and sliding windows. Internally, a session
window operator 
+creates a new window for each arriving record and merges windows together if their are closer
to each other 
+than the defined gap.
+In order to be mergeable, a session window operator requires a merging [Trigger](#triggers)
and a merging 
+[Window Function](#window-functions), such as `ReduceFunction` or `WindowFunction` 
+(`FoldFunction` cannot merge.)
+
+### Global Windows
+
+A *global windows* assigner assigns all elements with the same key to the same single *global
window*.
+This windowing scheme is only useful if you also specify a custom [trigger](#triggers). Otherwise,
+no computation will be performed, as the global window does not have a natural end at
+which we could process the aggregated elements.
+
+<img src="{{ site.baseurl }}/fig/non-windowed.svg" class="center" style="width: 80%;"
/>
+
+The following code snippets show how to use a global window.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(GlobalWindows.create())
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(GlobalWindows.create())
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
+</div>
+</div>
 
 ## Window Functions
 


Mime
View raw message