flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uce <...@git.apache.org>
Subject [GitHub] flink pull request #2154: [FLINK-4062] Update Windowing Documentation
Date Thu, 30 Jun 2016 15:49:22 GMT
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2154#discussion_r69158646
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -24,1023 +24,608 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    +Flink uses a concept called *windows* to divide a (potentially) infinite `DataStream` into finite
    +slices based on the timestamps of elements or other criteria. This division is required when working
    +with infinite streams of data and performing transformations that aggregate elements.
    +
    +<span class="label label-info">Info</span> We will mostly talk about *keyed windowing* here, i.e.
    +windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements are
    +subdivided based on both window and key before being given to
    +a user function. The work can thus be distributed across the cluster
    +because the elements for different keys can be processed independently. If you absolutely have to,
    +you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed
    +windows work.
    +
     * This will be replaced by the TOC
     {:toc}
     
    -## Windows on Keyed Data Streams
    -
    -Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group elements *per key*,
    -i.e., each window will contain elements with the same key value.
    +## Basics
     
    -### Basic Window Constructs
    +For a windowed transformations you must at least specify a *key*
    +(see [specifying keys](apis/common/index.html#specifying-keys))
    +a *window assigner* and a *window function*. The *key* divides the infinite, non-keyed, stream
    +into logical keyed streams while the *window assigner* assigns elements to finite per-key windows.
    +Finally, the *window function* is used to process the elements of each window.
     
    -Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows
    -for common use cases. See first if your use case can be served by the pre-defined windows below before moving
    -to defining your own windows.
    +The basic structure of a windowed transformation is thus as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<T> input = ...;
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that elements are
    -          grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window.
    -	  The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5));
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are
    -             grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -      {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight java %}
    -keyedStream.countWindow(1000);
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element can belong to more than one window (as windows overlap by at most 900 elements).
    -  {% highlight java %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>);
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[T] = ...
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that elements are
    -          grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window.
    -          The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5))
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are
    -             grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -      {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight scala %}
    -keyedStream.countWindow(1000)
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element can belong to more than one window (as windows overlap by at most 900 elements).
    -  {% highlight scala %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>)
    +{% endhighlight %}
     </div>
     </div>
     
    -### Advanced Window Constructs
    +We will cover [window assigners](#window-assigners) in a separate section below.
    +
    +The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively
    +takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these ways
    +of specifying a windowed transformation in detail below: [window functions](#window-functions).
    +
    +For more advanced use cases you can also specify a `Trigger` that determines when exactly a window
    +is being considered as *ready for processing*. These will be covered in more detail in
    +[triggers](#triggers).
    +
    +## Window Assigners
    +
    +The window assigner specifies how elements of the stream are divided into finite slices. Flink comes
    +with pre-implemented window assigners for the most typical use cases, namely *tumbling windows*,
    +*sliding windows*, *session windows* and *global windows* but you can implement your own by
    +extending the `WindowAssigner` class. All the built-in window assigners, except for the global
    +windows one, assign elements to windows based on time, which can either be processing time or event
    +time.
    +
    +Let's first look at how each of these window assigners works before looking at how they can be used
    +in a Flink program. We will be using abstract figures to visualize the workings of each assigner:
    +in the following, the purple circles are elements of the stream, they are partitioned
    +by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis shows the progress
    +of time.
    +
    +### Global Windows
    +
    +Global windows are a way of specifying that we don't want to subdivide our elements into windows.
    +Each element is assigned to one single per-key *global window*.
    +This windowing scheme is only useful if you also specify a custom [trigger](#triggers). Otherwise,
    +no computation is ever going to be performed, as the global window does not have a natural end at
    +which we could process the aggregated elements.
    +
    +<img src="non-windowed.svg" class="center" style="width: 80%;" />
    +
    +### Tumbling Windows
    +
    +A *tumbling windows* assigner assigns elements to fixed length, non-overlapping windows of a
    +specified *window size*.. For example, if you specify a window size of 5 minutes, the window
    +function will get 5 minutes worth of elements in each invocation.
     
    -The general mechanism can define more powerful windows at the cost of more verbose syntax. For example,
    -below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second,
    -but the execution of the window function is triggered when 100 elements have been added to the
    -window, and every time execution is triggered, 10 elements are retained in the window:
    +<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
    +
    +### Sliding Windows
    +
    +The *sliding windows* assigner assigns elements to windows of fixed length equal to *window size*,
    +as the tumbling windows assigner, but in this case, windows can be overlapping. The size of the
    +overlap is defined by the user-specified parameter *window slide*. As windows are overlapping, an
    +element can be assigned to multiple windows
    +
    +For example, you could have windows of size 10 minutes that slide by 5 minutes. With this you get 10
    +minutes worth of elements in each invocation of the window function and it will be invoked for every
    +5 minutes of data.
    +
    +<img src="sliding-windows.svg" class="center" style="width: 80%;" />
    +
    +### Session Windows
    +
    +The *session windows* assigner is ideal for cases where the window boundaries need to adjust to the
    +incoming data. Both the *tumbling windows* and *sliding windows* assigner assign elements to windows
    +that start at fixed time points and have a fixed *window size*. With session windows it is possible
    +to have windows that start at individual points in time for each key and that end once there has
    +been a certain period of inactivity. The configuration parameter is the *session gap* that specifies
    +how long to wait for new data before considering a session as closed.
    +
    +<img src="session-windows.svg" class="center" style="width: 80%;" />
    +
    +### Specifying a Window Assigner
    +
    +The built-in window assigners (except `GlobalWindows`) come in two versions. One for processing-time
    +windowing and one for event-time windowing. The processing-time assigners assign elements to
    +windows based on the current clock of the worker machines while the event-time assigners assign
    +windows based on the timestamps of elements. Please have a look at
    +[event time](/apis/streaming/event_time.html) to learn about the difference between processing time
    +and event time and about how timestamps can be assigned to elements.
    +
    +The following code snippets show how each of the window assigners can be used in a program:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10));
    +DataStream<T> input = ...;
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
    +    .<windowed transformation>(<window function>);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10))
    +val input: DataStream[T] = ...
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
     {% endhighlight %}
     </div>
     </div>
     
    -The general recipe for building a custom window is to specify (1) a `WindowAssigner`, (2) a `Trigger` (optionally),
    -and (3) an `Evictor` (optionally).
    +## Window Functions
     
    -The `WindowAssigner` defines how incoming elements are assigned to windows. A window is a logical group of elements
    -that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according
    -to some notion of time described above within these values are part of the window).
    +The *window function* is used to process the elements of each window (and key) once the system
    +determines that a window is ready for processing (see [triggers](#triggers) for how the system
    +determines when a window is ready).
     
    -For example, the `SlidingEventTimeWindows`
    -assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that
    -time starts from 0 and is measured in milliseconds. Then, we have 6 windows
    -that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming
    -element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be
    -assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your
    -own window types by extending the `WindowAssigner` class.
    +The window function can be one of `ReduceFunction`, `FoldFunction` or `WindowFunction`. The former
    +two can be executed more efficiently because Flink can incrementally aggregate the elements for each
    +window as they arrive. A `WindowFunction` gets an `Iterable` for all the elements contained in a
    +window and additional meta information about the window to which the elements belong.
     
    -<div class="codetabs" markdown="1">
    +A windowed transformation with a `WindowFunction` cannot be executed as efficiently as the other
    +cases because Flink has to buffer all elements for a window internally before invoking the function.
    +This can be mitigated by combining a `WindowFunction` with a `ReduceFunction` or `FoldFunction` to
    +get both incremental aggregation of window elements and the additional information that the
    +`WindowFunction` receives. We will look at examples for each of these variants.
    +
    +### ReduceFunction
    +
    +A reduce function specifies how two values can be combined to form one element. Flink can use this
    +to incrementally aggregate the elements in a window.
     
    +A `ReduceFunction` can be used in a program like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Global window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -	    All incoming elements of a given key are assigned to the same window.
    -	    The window does not contain a default trigger, hence it will never be triggered
    -	    if a trigger is not explicitly specified.
    -          </p>
    -    {% highlight java %}
    -stream.window(GlobalWindows.create());
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (1 second below) based on
    -            their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -            This assigner comes with a default trigger that fires for a window when a
    -            watermark with value higher than its end-value is received.
    -          </p>
    -      {% highlight java %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
    -      {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window when a
    -	          watermark with value higher than its end-value is received.
    -          </p>
    -    {% highlight java %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -              Incoming elements are assigned to a window of a certain size (1 second below) based on
    -              the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -              This assigner comes with a default trigger that fires for a window a window when the current
    -              processing time exceeds its end-value.
    -            </p>
    -      {% highlight java %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window a window when the current
    -            processing time exceeds its end-value.
    -          </p>
    -    {% highlight java %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -          <tr>
    -        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below).
    -            Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are
    -            consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements
    -            can be connected into a session by intermediate elements.
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -       <tr>
    -        <td><strong>Processing time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -           This is similar to event-time session windows but works on the current processing
    -           time instead of the timestamp of elements
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .reduce(new ReduceFunction<Tuple2<String, Long>> {
    +      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
    +        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
    +      }
    +    });
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Global window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            All incoming elements of a given key are assigned to the same window.
    -	    The window does not contain a default trigger, hence it will never be triggered
    -	    if a trigger is not explicitly specified.
    -          </p>
    -    {% highlight scala %}
    -stream.window(GlobalWindows.create)
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Incoming elements are assigned to a window of a certain size (1 second below) based on
    -            their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -            This assigner comes with a default trigger that fires for a window when a
    -            watermark with value higher than its end-value is received.
    -            </p>
    -      {% highlight scala %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window when a
    -            watermark with value higher than its end-value is received.
    -          </p>
    -    {% highlight scala %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -              Incoming elements are assigned to a window of a certain size (1 second below) based on
    -              the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -              This assigner comes with a default trigger that fires for a window a window when the current
    -              processing time exceeds its end-value.
    -
    -            </p>
    -      {% highlight scala %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window a window when the current
    -            processing time exceeds its end-value.
    -          </p>
    -    {% highlight scala %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -         <tr>
    -        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below).
    -            Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are
    -            consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements
    -            can be connected into a session by intermediate elements.
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -       <tr>
    -        <td><strong>Processing time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -           This is similar to event-time session windows but works on the current processing
    -           time instead of the timestamp of elements
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    -</div>
    +{% highlight scala %}
    +val input: DataStream[(String, Long)] = ...
     
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
    +{% endhighlight %}
    +</div>
     </div>
     
    -The `Trigger` specifies when the function that comes after the window clause (e.g., `sum`, `count`) is evaluated ("fires")
    -for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the
    -definition of the `WindowAssigner`). Flink comes bundled with a set of triggers if the ones that windows use by
    -default do not fit the application. You can write your own trigger by implementing the `Trigger` interface. Note that
    -specifying a trigger will override the default trigger of the window assigner.
    +A `ReduceFunction` specifies how two elements from the input can be combined to produce
    +an output element. This example will sum up the second field of the tuple for all elements
    +in a window.
     
    -<div class="codetabs" markdown="1">
    +### FoldFunction
     
    +A fold function can be specified like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -    <td><strong>Processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when the current processing time exceeds its end-value.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ProcessingTimeTrigger.create());
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Watermark trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
     {% highlight java %}
    -windowedStream.trigger(EventTimeTrigger.create());
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
    +       public String fold(String acc, Tuple2<String, Long> value) {
    +         return acc + value.f1;
    +       }
    +    });
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        The window is actually fired only when the current processing time exceeds its end-value.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous watermark time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        A window is actually fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[(String, Long)] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .fold("") { (acc, v) => acc + v._2 }
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Count trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when it has more than a certain number of elements (1000 below).
    -        The elements of the triggered window are retained.
    -      </p>
    +</div>
    +</div>
    +
    +A `FoldFunction` specifies how elements from the input will be added to an initial
    +accumulator value (`""`, the empty string, in our example). This example will compute
    +a concatenation of all the `Long` fields of the input.
    +
    +### WindowFunction - The Generic Case
    +
    +Using a `WindowFunction` provides most flexibility, at the cost of performance. The reason for this
    +is that elements cannot be incrementally aggregated for a window and instead need to be buffered
    +internally until the window is considered ready for processing. A `WindowFunction` gets an
    +`Iterable` containing all the elements of the window being processed. The signature of
    +`WindowFunction` is this:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
     {% highlight java %}
    -windowedStream.trigger(CountTrigger.of(1000));
    +public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    +
    +  /**
    +   * Evaluates the window and outputs none or several elements.
    +   *
    +   * @param key The key for which this window is evaluated.
    +   * @param window The window that is being evaluated.
    +   * @param input The elements in the window being evaluated.
    +   * @param out A collector for emitting elements.
    +   *
    +   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    +   */
    +  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Purging trigger</strong></td>
    -    <td>
    -      <p>
    -        Takes any trigger as an argument and forces the triggered window elements to be
    -        "purged" (discarded) after triggering.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    +
    +  /**
    +   * Evaluates the window and outputs none or several elements.
    +   *
    +   * @param key The key for which this window is evaluated.
    +   * @param window The window that is being evaluated.
    +   * @param input The elements in the window being evaluated.
    +   * @param out A collector for emitting elements.
    +   *
    +   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    +   */
    +  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Delta trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5000 milliseconds in the example).
    -        A window is actually fired when the value of the last added element exceeds the value of
    -        the first element inserted in the window according to a `DeltaFunction`.
    -      </p>
    +</div>
    +</div>
    +
    +Here we show an example that uses a `WindowFunction` to count the elements in a window. We do this
    +because we want to access information about the window itself to emit it along with the count.
    +This is very inefficient, however, and should be implemented with a
    +`ReduceFunction` in practice. Below, we will see an example of how a `ReduceFunction` can
    +be combined with a `WindowFunction` to get both incremental aggregation and the added
    +information of a `WindowFunction`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
     {% highlight java %}
    -windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() {
    -    @Override
    -    public double getDelta (Double old, Double new) {
    -        return (new - old > 0.01);
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyWindowFunction());
    +
    +/* ... */
    +
    +public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
    +
    +  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
    +    long count = 0;
    +    for (Tuple<String, Long> in: input) {
    +      count++;
         }
    -}));
    +    out.collect("Window: " + window + "count: " + count);
    +  }
    +}
    +
     {% endhighlight %}
    -    </td>
    -  </tr>
    - </tbody>
    -</table>
     </div>
     
    -
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -    <td><strong>Processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when the current processing time exceeds its end-value.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ProcessingTimeTrigger.create);
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Watermark trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(EventTimeTrigger.create);
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        The window is actually fired only when the current processing time exceeds its end-value.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous watermark time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        A window is actually fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Count trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when it has more than a certain number of elements (1000 below).
    -        The elements of the triggered window are retained.
    -      </p>
     {% highlight scala %}
    -windowedStream.trigger(CountTrigger.of(1000));
    +val input: DataStream[(String, Long)] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyWindowFunction())
    +
    +/* ... */
    +
    +class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
    +
    +  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
    +    var count = 0L
    +    for (in <- input) {
    +      count = count + 1
    +    }
    +    out.collect(s"Window $window count: $count")
    +  }
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Purging trigger</strong></td>
    -    <td>
    -      <p>
    -        Takes any trigger as an argument and forces the triggered window elements to be
    -        "purged" (discarded) after triggering.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
    +</div>
    +</div>
    +
    +### WindowFunction with Incremental Aggregation
    +
    +A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction`. This allows
    +to get the benefit of incremental window computation and also have the additional meta information
    +that writing a `WindowFunction` provides.
    +
    +This is an exampel that shows how incremental aggregation functions can be combined with
    +a `WindowFunction`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +// for folding incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +
    +// for reducing incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyReduceFunction(), new MyWindowFunction());
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Delta trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5000 milliseconds in the example).
    -        A window is actually fired when the value of the last added element exceeds the value of
    -        the first element inserted in the window according to a `DeltaFunction`.
    -      </p>
    +</div>
    +
    +<div data-lang="scala" markdown="1">
     {% highlight scala %}
    -windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 }))
    +val input: DataStream[(String, Long)] = ...
    +
    +// for folding incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction())
    +
    +// for reducing incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyReduceFunction(), new MyWindowFunction())
     {% endhighlight %}
    -    </td>
    -  </tr>
    - </tbody>
    -</table>
     </div>
    -
     </div>
     
    -After the trigger fires, and before the function (e.g., `sum`, `count`) is applied to the window contents, an
    -optional `Evictor` removes some elements from the beginning of the window before the remaining elements
    -are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by
    -implementing the `Evictor` interface.
    +## Dealing with Late Data
     
    -<div class="codetabs" markdown="1">
    +When working with event-time windowing it can happen that elements arrive late, i.e the
    +watermark that Flink uses to keep track of the progress of event-time is already past the
    +end timestamp of a window to which an element belongs. Please
    +see [event time](apis/streaming/event_time.html) and especially
    --- End diff --
    
    Links are broken


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---


Mime
View raw message