flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Petr Novotnik <petr.novot...@firma.seznam.cz>
Subject Re: union followed by timestamp assignment / watermark generation
Date Thu, 15 Jun 2017 11:20:55 GMT
Hello Aljoscha,

Fortunately, I found the program in Google's caches :) I've attached
below for reference. I'm stunned by how accurately you have hit the
point given the few pieces of information I left in the original text. +1

Yes, it's exactly as you explained. Can you think of a scenario where it
would lead to reasonable results if a user placed the
time-extraction/watermark-generation (directly or indirectly) after a
union operation? So far I couldn't and start believing that at least
it'd be nice to warn the user if he tries to do so.

Many thanks for you analysis and the time,
Pete.


> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
> 
> import java.util.Arrays;
> import java.util.List;
> import java.util.concurrent.TimeUnit;
> 
> public class TimestampAssignmentTest {
> 
>   public static void main(String[] args) throws Exception {
>     runTest();
>   }
> 
>   private static void runTest() throws Exception {
>     List<Tuple2<String, Long>> left = Arrays.asList(Tuple2.of("one", 1L),
Tuple2.of("two", 3L));
>     List<Tuple2<String, Long>> right = Arrays.asList(Tuple2.of("three", 2L),
Tuple2.of("four", 4L));
> 
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     env.getConfig().setAutoWatermarkInterval(10);
>     env.setParallelism(1);
> 
>     // ~ a very fast source
>     SingleOutputStreamOperator<Tuple2<String, Long>> leftInput =
>         env.addSource(new DelayedSourceFunction<>(left, Time.milliseconds(0)))
>         .returns(new TypeHint<Tuple2<String, Long>>() {});
>     // ~ a very slow source
>     SingleOutputStreamOperator<Tuple2<String, Long>> rightInput =
>         env.addSource(new DelayedSourceFunction<>(right, Time.seconds(10)))
>         .returns(new TypeHint<Tuple2<String, Long>>() {});
> 
>     leftInput.union(rightInput)
>         .assignTimestampsAndWatermarks(new EventTimeExtractor<>(Time.milliseconds(0)))
>         .map(t -> t.f0).returns(new TypeHint<String>() {})
>         .timeWindowAll(Time.milliseconds(3))
>         .reduce((ReduceFunction<String>) (s, t) -> s + "|" + t).returns(new
TypeHint<String>() {})
>         .print();
> 
>     env.execute();
>   }
> 
>   static class EventTimeExtractor<I>
>     extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<I, Long>> {
> 
>     EventTimeExtractor(Time maxOutOfOrderness) {
>       super(maxOutOfOrderness);
>     }
> 
>     @Override
>     public long extractTimestamp(Tuple2<I, Long> element) {
>       return element.f1;
>     }
>   }
> 
>   static class DelayedSourceFunction<I> implements SourceFunction<I> {
>     private volatile boolean running;
>     private final List<I> elems;
>     private final long delayMillis;
> 
>     DelayedSourceFunction(List<I> elems, Time delay) {
>       this.elems = elems;
>       this.delayMillis = delay.toMilliseconds();
>     }
> 
>     @Override
>     public void run(SourceContext<I> ctx) throws Exception {
>       running = true;
>       for (I elem : elems) {
>         if (!running) {
>           break;
>         }
>         delay();
>         ctx.collect(elem);
>       }
>     }
> 
>     private void delay() throws InterruptedException {
>       if (delayMillis > 0) {
>         long start = System.nanoTime();
>         while (true) {
>           long curr = System.nanoTime();
>           long waitMillis = TimeUnit.NANOSECONDS.toMillis(curr - start);
>           if (waitMillis < delayMillis) {
>             Thread.sleep(delayMillis - waitMillis);
>           } else {
>             break;
>           }
>         }
>       }
>     }
> 
>     @Override
>     public void cancel() {
>       running = false;
>     }
>   }
> }



On 06/14/2017 04:02 PM, Aljoscha Krettek wrote:
> Hi Petr,
> 
> I just stumbled across this (slightly older) mail. Your example on pastebin is not available
anymore but I’m guessing you have roughly these two topologies:
> 
> 1.
> 
> Source1 -> Map1 -> ExtractTimestamps -| 
> 								  | ->  Map3 …
> Source2 -> Map2 -> ExtractTimestamps -|
> 
> The union is not visible at the graph level, it’s implicit in the combination of the
two input streams.
> 
> 2.
> 
> Source1 -> Map1 -| 
> 			      | -> ExtractTimestamps -> Map3 …
> Source2 -> Map2 -|
> 
> The union is not visible at the graph level, it’s implicit in the combination of the
two input streams.
> 
> I’m also guessing that you have a timestamp/watermark assigner where the watermark
is the highest-seen timestamp minus some lateness bound. I think the behaviour is not necessarily
an artefact of the Flink implementation (with maps and extractors being fused together) but
results from the graph itself and how watermarks are defined and how the extractor works:
in the first case, each stream (before the union) has its own watermark and the watermark
at Map3 is the minimum over those watermarks. This explains why a lower watermark on the one
stream holds back the watermark in total at Map3. In the second case, the two streams are
unioned together before extracting a timestamp/watermark and the choice of timestamp extractor
(which takes the highest-seen timestamp) means that the watermark now advances “faster”
because there is logically not a slower, separate stream anymore.
> 
> Is that analysis correct? Does my description roughly make sense?
> 
> Best,
> Aljoscha
> 
>> On 6. May 2017, at 15:00, Petr Novotnik <petr.novotnik@firma.seznam.cz> wrote:
>>
>> Hello Flinkers,
>>
>> Given this small example program:
>>
>>> https://pastebin.com/30JbbgpH
>>
>> I'd expect the output:
>>
>>> one|three
>>> two|four
>>
>> However, I consistently receive ...
>>
>>> one
>>> two|four
>>
>> ... due to "three" being considered a late-comer which then gets
>> discarded. When I remove `assignTimestampsAndWatermarks` after the
>> `union` and place it separately on each of the union's inputs, i.e.
>> before the `union`, I get what I expect.
>>
>> Now, after digging through Flink's source code, this behavior actually
>> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
>> operators form one task). Though, from a user/api perspective, it is at
>> least surprising.
>>
>> I wanted to ask whether kind of behavior is known, indented or maybe
>> something to be improved to avoid the gotcha?
>>
>> Many thanks in advance,
>> Pete.
>>
> 


Mime
View raw message