flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-3899] [docs] Add examples for incremental window computation.
Date Thu, 25 Aug 2016 12:49:30 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 016c381cd -> 717fc906d


[FLINK-3899] [docs] Add examples for incremental window computation.

This closes #2368


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717fc906
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717fc906
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717fc906

Branch: refs/heads/release-1.1
Commit: 717fc906db6db51596320853a54ffeb92b1a591f
Parents: 016c381
Author: danielblazevski <daniel.blazevski@gmail.com>
Authored: Sat Aug 13 18:57:18 2016 -0400
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Aug 25 14:48:43 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/windows.md | 147 +++++++++++++++++++++++++++++-------
 1 file changed, 120 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/717fc906/docs/apis/streaming/windows.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md
index 7a93723..c3fdc0e 100644
--- a/docs/apis/streaming/windows.md
+++ b/docs/apis/streaming/windows.md
@@ -452,49 +452,142 @@ class MyWindowFunction extends WindowFunction[(String, Long), String,
String, Ti
 
 ### WindowFunction with Incremental Aggregation
 
-A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction`. When
doing
-this, the `ReduceFunction`/`FoldFunction` will be used to incrementally aggregate elements
as they
-arrive while the `WindowFunction` will be provided with the aggregated result when the window
is
-ready for processing. This allows to get the benefit of incremental window computation and
also have
-the additional meta information that writing a `WindowFunction` provides.
+A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to

+incrementally aggregate elements as they arrive in the window. 
+When the window is closed, the `WindowFunction` will be provided with the aggregated result.

+This allows to incrementally compute windows while having access to the 
+additional window meta information of the `WindowFunction`.
 
-This is an example that shows how incremental aggregation functions can be combined with
-a `WindowFunction`.
+#### Incremental Window Aggregation with FoldFunction
+
+The following example shows how an incremental `FoldFunction` can be combined with
+a `WindowFunction` to extract the number of events in the window and return also 
+the key and end time of the window. 
+
+Please note that the use of a `FoldFunction` in combination with `WindowFunction` is
+restricted in that the types of the `Iterable` and `Collector` arguments in
+`WindowFunction` must both correspond to the type of the accumulator in the `FoldFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<Tuple2<String, Long>> input = ...;
+DataStream<SensorReading> input = ...;
 
-// for folding incremental computation
 input
-    .keyBy(<key selector>)
-    .window(<window assigner>)
-    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
+
+// Function definitions
+
+private static class MyFoldFunction
+    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
+
+  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc,
SensorReading s) {
+      Integer cur = acc.getField(2);
+      acc.setField(2, cur + 1);
+      return acc;
+  }
+}
+
+private static class MyWindowFunction 
+    implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long,
Integer>, String, TimeWindow> {
+  
+  public void apply(String key,
+                    TimeWindow window,
+                    Iterable<Tuple3<String, Long, Integer>> counts,
+                    Collector<Tuple3<String, Long, Integer>> out) {
+    Integer count = counts.iterator().next().getField(2);
+    out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
+  }
+}
 
-// for reducing incremental computation
-input
-    .keyBy(<key selector>)
-    .window(<window assigner>)
-    .apply(new MyReduceFunction(), new MyWindowFunction());
 {% endhighlight %}
 </div>
-
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val input: DataStream[(String, Long)] = ...
 
-// for folding incremental computation
+val input: DataStream[SensorReading] = ...
+
+input
+ .keyBy(<key selector>)
+ .timeWindow(<window assigner>)
+ .apply (
+    ("", 0L, 0), 
+    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
+    ( key: String, 
+      window: TimeWindow, 
+      counts: Iterable[(String, Long, Int)],
+      out: Collector[(String, Long, Int)] ) => 
+      {
+        val count = counts.iterator.next()
+        out.collect((key, window.getEnd, count._3))
+      }
+  )
+
+{% endhighlight %}
+</div>
+</div>
+
+#### Incremental Window Aggregation with ReduceFunction
+
+The following example shows how an incremental `ReduceFunction` can be combined with
+a `WindowFunction` to return the smallest event in a window along 
+with the start time of the window.  
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<SensorReading> input = ...;
+
 input
-    .keyBy(<key selector>)
-    .window(<window assigner>)
-    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction())
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .apply(new MyReduceFunction(), new MyWindowFunction());
+
+// Function definitions
+
+private static class MyReduceFunction implements ReduceFunction<SensorReading> {
+
+  public SensorReading reduce(SensorReading r1, SensorReading r2) {
+      return r1.value() > r2.value() ? r2 : r1;
+  }
+}
+
+private static class MyWindowFunction 
+    implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String,
TimeWindow> {
+  
+  public void apply(String key,
+                    TimeWindow window,
+                    Iterable<SensorReading> minReadings,
+                    Collector<Tuple2<Long, SensorReading>> out) {
+      SensorReading min = minReadings.iterator().next();
+      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
+  }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val input: DataStream[SensorReading] = ...
 
-// for reducing incremental computation
 input
-    .keyBy(<key selector>)
-    .window(<window assigner>)
-    .apply(new MyReduceFunction(), new MyWindowFunction())
+  .keyBy(<key selector>)
+  .timeWindow(<window assigner>)
+  .apply(
+    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1
},
+    ( key: String, 
+      window: TimeWindow, 
+      minReadings: Iterable[SensorReading], 
+      out: Collector[(Long, SensorReading)] ) => 
+      {
+        val min = minReadings.iterator.next()
+        out.collect((window.getStart, min))
+      }
+  )
+  
 {% endhighlight %}
 </div>
 </div>


Mime
View raw message