flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/5] flink git commit: [FLINK-7568] Add section about consecutive windows to window doc
Date Tue, 05 Sep 2017 16:07:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7c11bd7f4 -> 6d2124ee2

[FLINK-7568] Add section about consecutive windows to window doc

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d2124ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d2124ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d2124ee

Branch: refs/heads/master
Commit: 6d2124ee2e9a1d629bc56b470f356faa90559e1c
Parents: 7d9e3bf
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Sep 1 15:29:21 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Sep 5 17:33:47 2017 +0200

 docs/dev/stream/operators/windows.md | 74 +++++++++++++++++++++++++++++++
 1 file changed, 74 insertions(+)

diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 85a6630..012d531 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -1080,6 +1080,80 @@ as they may "bridge" the gap between two pre-existing, unmerged windows.
 <span class="label label-info">Attention</span> You should be aware that the
elements emitted by a late firing should be treated as updated results of a previous computation,
i.e., your data stream will contain multiple results for the same computation. Depending on
your application, you need to take these duplicated results into account or deduplicate them.
+## Working with window results
+The result of a windowed operation is again a `DataStream`, no information about the windowed
+operations is retained in the result elements so if you want to keep meta-information about
+window you have to manually encode that information in the result elements in your
+`ProcessWindowFunction`. The only relevant information that is set on the result elements
is the
+element *timestamp*. This is set to the maximum allowed timestamp of the processed window,
+is *end timestamp - 1*, since the window-end timestamp is exclusive. Note that this is true
for both
+event-time windows and processing-time windows. i.e. after a windowed operations elements
+have a timestamp, but this can be an event-time timestamp or a processing-time timestamp.
+processing-time windows this has no special implications but for event-time windows this
+with how watermarks interact with windows enables
+[consecutive windowed operations](#consecutive-windowed-operations) with the same window
sizes. We
+will cover this after taking a look how watermarks interact with windows.
+### Interaction of watermarks and windows
+Before continuing in this section you might want to take a look at our section about
+[event time and watermarks]({{ site.baseurl }}/dev/event_time.html).
+When watermarks arrive at the window operator this triggers two things:
+ - the watermark triggers computation of all windows where the maximum timestamp (which is
+ *end-timestamp - 1*) is smaller than the new watermark
+ - the watermark is forwarded (as is) to downstream operations
+Intuitively, a watermark "flushes" out any windows that would be considered late in downstream
+operations once they receive that watermark.
+### Consecutive windowed operations
+As mentioned before, the way the timestamp of windowed results is computed and how watermarks
+interact with windows allows stringing together consecutive windowed operations. This can
be useful
+when you want to do two consecutive windowed operations where you want to use different keys
+still want elements from the same upstream window to end up in the same downstream window.
+this example:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Integer> input = ...;
+DataStream<Integer> resultsPerKey = input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .reduce(new Summer());
+DataStream<Integer> globalResults = resultsPerKey
+    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .process(new TopKWindowFunction());
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Int] = ...
+val resultsPerKey = input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .reduce(new Summer())
+val globalResults = resultsPerKey
+    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .process(new TopKWindowFunction())
+{% endhighlight %}
+In this example, the results for time window `[0, 5)` from the first operation will also
end up in
+time window `[0, 5)` in the subsequent windowed operation. This allows calculating a sum
per key
+and then calculating the top-k elements within the same window in the second operation.
+and then calculating the top-k elements within the same window in the second operation.
 ## Useful state size considerations
 Windows can be defined over long periods of time (such as days, weeks, or months) and therefore
accumulate very large state. There are a couple of rules to keep in mind when estimating the
storage requirements of your windowing computation:

View raw message