Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A7470200B2B for ; Tue, 28 Jun 2016 14:15:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A6037160A06; Tue, 28 Jun 2016 12:15:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A47CA160A6C for ; Tue, 28 Jun 2016 14:15:58 +0200 (CEST) Received: (qmail 87430 invoked by uid 500); 28 Jun 2016 12:15:57 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 87381 invoked by uid 99); 28 Jun 2016 12:15:57 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 12:15:57 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 861EF2C1F60 for ; Tue, 28 Jun 2016 12:15:57 +0000 (UTC) Date: Tue, 28 Jun 2016 12:15:57 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4062) Update Windowing Documentation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 28 Jun 2016 12:15:59 -0000 [ https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352894#comment-15352894 ] ASF GitHub Bot commented on FLINK-4062: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2154#discussion_r68744802 --- Diff: docs/apis/streaming/windows.md --- @@ -24,1023 +24,593 @@ specific language governing permissions and limitations under the License. --> +Flink uses a concept called *windows* to divide a (potentially) infinite `DataStream` into slices +based on the timestamps of elements or other criteria. This division is required when working +with infinite streams of data and performing transformations that aggregate elements. + +Info We will mostly talk about *keyed windowing* here, this +means that the elements are subdivided based on both window and key before being given to +a user function. Keyed windows have the advantage that work can be distributed across the cluster +because the elements for different keys can be processed in isolation. If you absolutely must, +you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed +windows work. + * This will be replaced by the TOC {:toc} -## Windows on Keyed Data Streams - -Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group elements *per key*, -i.e., each window will contain elements with the same key value. +## Basics -### Basic Window Constructs +For a windowed transformations you must at least specify a *key* (usually in the form of a +`KeySelector`) a *window assigner* and a *window function*. The *key* specifies how elements are +put into groups. The *window assigner* specifies how the infinite stream is divided into windows. +Finally, the *window function* is used to process the elements of each window. -Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows -for common use cases. See first if your use case can be served by the pre-defined windows below before moving -to defining your own windows. +The basic structure of a windowed transformation is thus as follows:
+{% highlight java %} +DataStream input = ...; -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight java %} -keyedStream.timeWindow(Time.seconds(5)); - {% endhighlight %} -

-
Sliding time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight java %} -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)); - {% endhighlight %} -

-
Tumbling count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight java %} -keyedStream.countWindow(1000); - {% endhighlight %} -

-
Sliding count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at most 900 elements). - {% highlight java %} -keyedStream.countWindow(1000, 100) - {% endhighlight %} -

-
- +input + .keyBy() + .window() + .(); +{% endhighlight %}
+{% highlight scala %} +val input: DataStream[T] = ... -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight scala %} -keyedStream.timeWindow(Time.seconds(5)) - {% endhighlight %} -

-
Sliding time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight scala %} -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)) - {% endhighlight %} -

-
Tumbling count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight scala %} -keyedStream.countWindow(1000) - {% endhighlight %} -

-
Sliding count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at most 900 elements). - {% highlight scala %} -keyedStream.countWindow(1000, 100) - {% endhighlight %} -

-
- +input + .keyBy() + .window() + .() +{% endhighlight %}
-### Advanced Window Constructs +We will cover the different window assigners in [window assigners](#window-assigners). + +The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively +takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these ways +of specifying a windowed transformation in detail in [window functions](#window-functions). + +For more advanced use cases you can also specify a `Trigger` that determines when exactly a window +is being considered as *ready for processing*. These will be covered in more detail in +[triggers](#triggers). + +## Window Assigners + +The window assigner specifies how elements of the stream are divided into slices. You can provide +your custom window assigner by implementing `WindowAssigner` but Flink comes with +window assigners for typical use cases: *tumbling windows*, *sliding windows*, *session windows* +and *global windows*. Except for the last, all of these assign windows based on time, which +can either be processing time or event time. + +Let's first look at how each of these window assigners works before looking at how they can be used +in a Flink program. We will be using abstract figures to visualize the workings of each assigner: +in the following, the purple circles are elements of the stream, they are partitioned +by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis shows the progress +of time. + +### Global Windows + +Global windows are a way of specifying that we don't want to subdivide our elements into windows. +Each element is assigned to the one single *global window* (still separate for each key, of course). +This is only useful if you also specify a custom [trigger](#triggers), otherwise you will never +process any data because the global window does not have a natural end at which we could process +the aggregated elements. + + -The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, -below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, -but the execution of the window function is triggered when 100 elements have been added to the -window, and every time execution is triggered, 10 elements are retained in the window: +### Tumbling Windows + +A *tumbling windows* assigner assigns elements to fixed time buckets of a specified *window size*. +For example, if you specify a window size of 5 minutes, the window function will get 5 minutes +worth of elements in each invocation. + + + +### Sliding Windows + +The *sliding windows* assigner is very similar to the *tumbling windows* assigner but it assigns +one element to more than one windows based on a *window size* and *window slide* size. For example, +you could have windows of size 10 minutes that slide by 5 minutes. With this you get 10 minutes +worth of elements in each invocation of the window function and it will be invoked for every +5 minutes of data. + + + +### Session Windows + +The *session windows* assigner can be used if windows need to dynamically adapt to the data. --- End diff -- The *session windows* assigner is ideal for cases where the window boundaries need to adjust to the incoming data. > Update Windowing Documentation > ------------------------------ > > Key: FLINK-4062 > URL: https://issues.apache.org/jira/browse/FLINK-4062 > Project: Flink > Issue Type: Sub-task > Components: Documentation > Affects Versions: 1.1.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > The window documentation could be a bit more principled and also needs updating with the new allowed lateness setting. > There is also essentially no documentation about how to write a custom trigger. -- This message was sent by Atlassian JIRA (v6.3.4#6332)