flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: union followed by timestamp assignment / watermark generation
Date Wed, 14 Jun 2017 14:02:03 GMT
Hi Petr,

I just stumbled across this (slightly older) mail. Your example on pastebin is not available
anymore but I’m guessing you have roughly these two topologies:


Source1 -> Map1 -> ExtractTimestamps -| 
								  | ->  Map3 …
Source2 -> Map2 -> ExtractTimestamps -|

The union is not visible at the graph level, it’s implicit in the combination of the two
input streams.


Source1 -> Map1 -| 
			      | -> ExtractTimestamps -> Map3 …
Source2 -> Map2 -|

The union is not visible at the graph level, it’s implicit in the combination of the two
input streams.

I’m also guessing that you have a timestamp/watermark assigner where the watermark is the
highest-seen timestamp minus some lateness bound. I think the behaviour is not necessarily
an artefact of the Flink implementation (with maps and extractors being fused together) but
results from the graph itself and how watermarks are defined and how the extractor works:
in the first case, each stream (before the union) has its own watermark and the watermark
at Map3 is the minimum over those watermarks. This explains why a lower watermark on the one
stream holds back the watermark in total at Map3. In the second case, the two streams are
unioned together before extracting a timestamp/watermark and the choice of timestamp extractor
(which takes the highest-seen timestamp) means that the watermark now advances “faster”
because there is logically not a slower, separate stream anymore.

Is that analysis correct? Does my description roughly make sense?


> On 6. May 2017, at 15:00, Petr Novotnik <petr.novotnik@firma.seznam.cz> wrote:
> Hello Flinkers,
> Given this small example program:
>> https://pastebin.com/30JbbgpH
> I'd expect the output:
>> one|three
>> two|four
> However, I consistently receive ...
>> one
>> two|four
> ... due to "three" being considered a late-comer which then gets
> discarded. When I remove `assignTimestampsAndWatermarks` after the
> `union` and place it separately on each of the union's inputs, i.e.
> before the `union`, I get what I expect.
> Now, after digging through Flink's source code, this behavior actually
> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
> operators form one task). Though, from a user/api perspective, it is at
> least surprising.
> I wanted to ask whether kind of behavior is known, indented or maybe
> something to be improved to avoid the gotcha?
> Many thanks in advance,
> Pete.

View raw message