flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-5968] Add documentation for WindowedStream.aggregate()
Date Sat, 21 Oct 2017 07:35:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master ebc3bc1f9 -> 558c71d2e


[FLINK-5968] Add documentation for WindowedStream.aggregate()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/558c71d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/558c71d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/558c71d2

Branch: refs/heads/master
Commit: 558c71d2e24515515f9c71d98e8176205b5dd854
Parents: f176c91
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Sat Oct 14 10:45:33 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Oct 21 09:33:38 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 214 ++++++++++++++++++++++++++++--
 1 file changed, 203 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/558c71d2/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 52c4b47..f31d2e5 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -41,7 +41,7 @@ for the rest of the page.
           [.evictor(...)]            <-  optional: "evictor" (else no evictor)
           [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
           [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for
late data)
-           .reduce/fold/apply()      <-  required: "function"
+           .reduce/aggregate/fold/apply()      <-  required: "function"
 
 **Non-Keyed Windows**
 
@@ -51,7 +51,7 @@ for the rest of the page.
           [.evictor(...)]            <-  optional: "evictor" (else no evictor)
           [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
           [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for
late data)
-           .reduce/fold/apply()      <-  required: "function"
+           .reduce/aggregate/fold/apply()      <-  required: "function"
 
 In the above, the commands in square brackets ([...]) are optional. This reveals that Flink
allows you to customize your
 windowing logic in many different ways so that it best fits your needs.
@@ -70,8 +70,8 @@ lateness of 1 min, Flink will create a new window for the interval between
`12:0
 a timestamp that falls into this interval arrives, and it will remove it when the watermark
passes the `12:06`
 timestamp.
 
-In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function
(`ProcessWindowFunction`, `ReduceFunction` or
-`FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function
will contain the computation to
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function
(`ProcessWindowFunction`, `ReduceFunction`,
+`AggregateFunction` or `FoldFunction`) (see [Window Functions](#window-functions)) attached
to it. The function will contain the computation to
 be applied to the contents of the window, while the `Trigger` specifies the conditions under
which the window is
 considered ready for the function to be applied. A triggering policy might be something like
"when the number of elements
 in the window is more than 4", or "when the watermark passes the end of the window". A trigger
can also decide to
@@ -333,7 +333,7 @@ they are  evaluated differently than tumbling and sliding windows. Internally,
a
 creates a new window for each arriving record and merges windows together if their are closer
to each other
 than the defined gap.
 In order to be mergeable, a session window operator requires a merging [Trigger](#triggers)
and a merging
-[Window Function](#window-functions), such as `ReduceFunction` or `ProcessWindowFunction`
+[Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction`
 (`FoldFunction` cannot merge.)
 
 ### Global Windows
@@ -378,14 +378,14 @@ to perform on each of these windows. This is the responsibility of the
*window f
 elements of each (possibly keyed) window once the system determines that a window is ready
for processing
 (see [triggers](#triggers) for how Flink determines when a window is ready).
 
-The window function can be one of `ReduceFunction`, `FoldFunction` or `ProcessWindowFunction`.
The first
+The window function can be one of `ReduceFunction`, `AggregateFunction`, `FoldFunction` or
`ProcessWindowFunction`. The first
 two can be executed more efficiently (see [State Size](#state size) section) because Flink
can incrementally aggregate
 the elements for each window as they arrive. A `ProcessWindowFunction` gets an `Iterable`
for all the elements contained in a
 window and additional meta information about the window to which the elements belong.
 
 A windowed transformation with a `ProcessWindowFunction` cannot be executed as efficiently
as the other
 cases because Flink has to buffer *all* elements for a window internally before invoking
the function.
-This can be mitigated by combining a `ProcessWindowFunction` with a `ReduceFunction` or `FoldFunction`
to
+This can be mitigated by combining a `ProcessWindowFunction` with a `ReduceFunction`, `AggregateFunction`,
or `FoldFunction` to
 get both incremental aggregation of window elements and the additional window metadata that
the
 `ProcessWindowFunction` receives. We will look at examples for each of these variants.
 
@@ -427,6 +427,93 @@ input
 
 The above example sums up the second fields of the tuples for all elements in a window.
 
+### AggregateFunction
+
+An `AggregateFunction` is a generalized version of a `ReduceFunction` that has three types:
an
+input type (`IN`), accumulator type (`ACC`), and an output type (`OUT`). The input type is
the type
+of elements in the input stream and the `AggregateFunction` has a method for adding one input
+element to an accumulator. The interface also has methods for creating an initial accumulator,
+for merging two accumulators into one accumulator and for extracting an output (of type `OUT`)
from
+an accumulator. We will see how this works in the example below.
+
+Same as with `ReduceFunction`, Flink will incrementally aggregate input elements of a window
as they
+arrive.
+
+A `AggregateFunction` can be defined and used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>,
Double> {
+  @Override
+  public Tuple2<Long, Long> createAccumulator() {
+    return new Tuple2<>(0L, 0L);
+  }
+
+  @Override
+  public Tuple2<Long, Long> add(
+    Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+  }
+
+  @Override
+  public Double getResult(Tuple2<Long, Long> accumulator) {
+    return accumulator.f0 / accumulator.f1;
+  }
+
+  @Override
+  public Tuple2<Long, Long> merge(
+    Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
+    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
+  }
+}
+
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .aggregate(new AverageAggregate());
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+/**
+ * The accumulator is used to keep a running sum and a count. The [getResult] method
+ * computes the average.
+ */
+class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
+  override def createAccumulator() = (0L, 0L)
+
+  override def add(value: (String, Long), accumulator: (Long, Long)) =
+    (accumulator._1 + value._2, accumulator._2 + 1L)
+
+  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
+
+  override def merge(a: (Long, Long), b: (Long, Long)) =
+    (a._1 + b._1, a._2 + b._2)
+}
+
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .aggregate(new AverageAggregate)
+{% endhighlight %}
+</div>
+</div>
+
+The above example computes the average of the second field of the elements in the window.
+
 ### FoldFunction
 
 A `FoldFunction` specifies how an input element of the window is combined with an element
of
@@ -645,11 +732,11 @@ class MyProcessWindowFunction extends ProcessWindowFunction[(String,
Long), Stri
 
 The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition,
the window function adds information about the window to the output.
 
-<span class="label label-danger">Attention</span> Note that using `ProcessWindowFunction`
for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction`
can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the
added information of a `ProcessWindowFunction`.
+<span class="label label-danger">Attention</span> Note that using `ProcessWindowFunction`
for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction`
or `AggregateFunction` can be combined with a `ProcessWindowFunction` to get both incremental
aggregation and the added information of a `ProcessWindowFunction`.
 
 ### ProcessWindowFunction with Incremental Aggregation
 
-A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction`
to
+A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an `AggregateFunction`,
or a `FoldFunction` to
 incrementally aggregate elements as they arrive in the window.
 When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated
result.
 This allows to incrementally compute windows while having access to the
@@ -721,6 +808,111 @@ input
 </div>
 </div>
 
+#### Incremental Window Aggregation with AggregateFunction
+
+The following example shows how an incremental `AggregateFunction` can be combined with
+a `ProcesWindowFunction` to compute the average and also emit the key and window along with
+the average.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long> input = ...;
+
+input
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>,
Double> {
+  @Override
+  public Tuple2<Long, Long> createAccumulator() {
+    return new Tuple2<>(0L, 0L);
+  }
+
+  @Override
+  public Tuple2<Long, Long> add(
+    Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+  }
+
+  @Override
+  public Double getResult(Tuple2<Long, Long> accumulator) {
+    return accumulator.f0 / accumulator.f1;
+  }
+
+  @Override
+  public Tuple2<Long, Long> merge(
+    Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
+    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
+  }
+}
+
+private static class MyProcessWindowFunction
+    implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow>
{
+
+  public void apply(String key,
+                    Context context,
+                    Iterable<Double> averages,
+                    Collector<Tuple2<String, Double>> out) {
+      Double average = averages.iterator().next();
+      out.collect(new Tuple2<>(key, average));
+  }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val input: DataStream[(String, Long)] = ...
+
+input
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The [getResult] method
+ * computes the average.
+ */
+class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
+  override def createAccumulator() = (0L, 0L)
+
+  override def add(value: (String, Long), accumulator: (Long, Long)) =
+    (accumulator._1 + value._2, accumulator._2 + 1L)
+
+  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
+
+  override def merge(a: (Long, Long), b: (Long, Long)) =
+    (a._1 + b._1, a._2 + b._2)
+}
+
+class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String,
TimeWindow] {
+
+  def apply(key: String, context: Context, averages: Iterable[Double], out: Collector[(String,
Double]): () = {
+    var count = 0L
+    for (in <- input) {
+      count = count + 1
+    }
+    val average = averages.iterator.next()
+    out.collect((key, average))
+  }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
 #### Incremental Window Aggregation with FoldFunction
 
 The following example shows how an incremental `FoldFunction` can be combined with
@@ -892,7 +1084,7 @@ Two things to notice about the above methods are:
 Once a trigger determines that a window is ready for processing, it fires, *i.e.*, it returns
`FIRE` or `FIRE_AND_PURGE`. This is the signal for the window operator
 to emit the result of the current window. Given a window with a `ProcessWindowFunction`
 all elements are passed to the `ProcessWindowFunction` (possibly after passing them to an
evictor).
-Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.
+Windows with `ReduceFunction`, `AggregateFunction`, or `FoldFunction` simply emit their eagerly
aggregated result.
 
 When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents
of the window, `FIRE_AND_PURGE` removes its content.
 By default, the pre-implemented triggers simply `FIRE` without purging the window state.
@@ -1162,7 +1354,7 @@ Windows can be defined over long periods of time (such as days, weeks,
or months
 
 1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling
windows keep one copy of each element (an element belongs to exactly window unless it is dropped
late). In contrast, sliding windows create several of each element, as explained in the [Window
Assigners](#window-assigners) section. Hence, a sliding window of size 1 day and slide 1 second
might not be a good idea.
 
-2. `FoldFunction` and `ReduceFunction` can significantly reduce the storage requirements,
as they eagerly aggregate elements and store only one value per window. In contrast, just
using a `ProcessWindowFunction` requires accumulating all elements.
+2. `ReduceFunction`, `AggregateFunction`, and `FoldFunction` can significantly reduce the
storage requirements, as they eagerly aggregate elements and store only one value per window.
In contrast, just using a `ProcessWindowFunction` requires accumulating all elements.
 
 3. Using an `Evictor` prevents any pre-aggregation, as all the elements of a window have
to be passed through the evictor before applying the computation (see [Evictors](#evictors)).
 


Mime
View raw message