flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: union followed by timestamp assignment / watermark generation
Date Thu, 15 Jun 2017 11:48:37 GMT
Hi,

Yes, I can’t think of cases right now where placing the extractor after a union makes sense.
In general, I think it’s always best to place the timestamp extractor as close to the sources
(or in the sources, for Kafka) as possible. Right now it would be quite hard (and probably
a bit hacky) to detect such a situation and give a warning. If you want, please open a Jira
issue so that we don’t forget about this and maybe we can solve it in the future when we
have a slightly different graph structure that would allow to run such analyses on the user
program.

Best,
Aljoscha
> On 15. Jun 2017, at 13:20, Petr Novotnik <petr.novotnik@firma.seznam.cz> wrote:
> 
> Hello Aljoscha,
> 
> Fortunately, I found the program in Google's caches :) I've attached
> below for reference. I'm stunned by how accurately you have hit the
> point given the few pieces of information I left in the original text. +1
> 
> Yes, it's exactly as you explained. Can you think of a scenario where it
> would lead to reasonable results if a user placed the
> time-extraction/watermark-generation (directly or indirectly) after a
> union operation? So far I couldn't and start believing that at least
> it'd be nice to warn the user if he tries to do so.
> 
> Many thanks for you analysis and the time,
> Pete.
> 
> 
>> import org.apache.flink.api.common.functions.ReduceFunction;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> 
>> import java.util.Arrays;
>> import java.util.List;
>> import java.util.concurrent.TimeUnit;
>> 
>> public class TimestampAssignmentTest {
>> 
>>  public static void main(String[] args) throws Exception {
>>    runTest();
>>  }
>> 
>>  private static void runTest() throws Exception {
>>    List<Tuple2<String, Long>> left = Arrays.asList(Tuple2.of("one", 1L),
Tuple2.of("two", 3L));
>>    List<Tuple2<String, Long>> right = Arrays.asList(Tuple2.of("three",
2L), Tuple2.of("four", 4L));
>> 
>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>    env.getConfig().setAutoWatermarkInterval(10);
>>    env.setParallelism(1);
>> 
>>    // ~ a very fast source
>>    SingleOutputStreamOperator<Tuple2<String, Long>> leftInput =
>>        env.addSource(new DelayedSourceFunction<>(left, Time.milliseconds(0)))
>>        .returns(new TypeHint<Tuple2<String, Long>>() {});
>>    // ~ a very slow source
>>    SingleOutputStreamOperator<Tuple2<String, Long>> rightInput =
>>        env.addSource(new DelayedSourceFunction<>(right, Time.seconds(10)))
>>        .returns(new TypeHint<Tuple2<String, Long>>() {});
>> 
>>    leftInput.union(rightInput)
>>        .assignTimestampsAndWatermarks(new EventTimeExtractor<>(Time.milliseconds(0)))
>>        .map(t -> t.f0).returns(new TypeHint<String>() {})
>>        .timeWindowAll(Time.milliseconds(3))
>>        .reduce((ReduceFunction<String>) (s, t) -> s + "|" + t).returns(new
TypeHint<String>() {})
>>        .print();
>> 
>>    env.execute();
>>  }
>> 
>>  static class EventTimeExtractor<I>
>>    extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<I, Long>> {
>> 
>>    EventTimeExtractor(Time maxOutOfOrderness) {
>>      super(maxOutOfOrderness);
>>    }
>> 
>>    @Override
>>    public long extractTimestamp(Tuple2<I, Long> element) {
>>      return element.f1;
>>    }
>>  }
>> 
>>  static class DelayedSourceFunction<I> implements SourceFunction<I> {
>>    private volatile boolean running;
>>    private final List<I> elems;
>>    private final long delayMillis;
>> 
>>    DelayedSourceFunction(List<I> elems, Time delay) {
>>      this.elems = elems;
>>      this.delayMillis = delay.toMilliseconds();
>>    }
>> 
>>    @Override
>>    public void run(SourceContext<I> ctx) throws Exception {
>>      running = true;
>>      for (I elem : elems) {
>>        if (!running) {
>>          break;
>>        }
>>        delay();
>>        ctx.collect(elem);
>>      }
>>    }
>> 
>>    private void delay() throws InterruptedException {
>>      if (delayMillis > 0) {
>>        long start = System.nanoTime();
>>        while (true) {
>>          long curr = System.nanoTime();
>>          long waitMillis = TimeUnit.NANOSECONDS.toMillis(curr - start);
>>          if (waitMillis < delayMillis) {
>>            Thread.sleep(delayMillis - waitMillis);
>>          } else {
>>            break;
>>          }
>>        }
>>      }
>>    }
>> 
>>    @Override
>>    public void cancel() {
>>      running = false;
>>    }
>>  }
>> }
> 
> 
> 
> On 06/14/2017 04:02 PM, Aljoscha Krettek wrote:
>> Hi Petr,
>> 
>> I just stumbled across this (slightly older) mail. Your example on pastebin is not
available anymore but I’m guessing you have roughly these two topologies:
>> 
>> 1.
>> 
>> Source1 -> Map1 -> ExtractTimestamps -| 
>> 								  | ->  Map3 …
>> Source2 -> Map2 -> ExtractTimestamps -|
>> 
>> The union is not visible at the graph level, it’s implicit in the combination of
the two input streams.
>> 
>> 2.
>> 
>> Source1 -> Map1 -| 
>> 			      | -> ExtractTimestamps -> Map3 …
>> Source2 -> Map2 -|
>> 
>> The union is not visible at the graph level, it’s implicit in the combination of
the two input streams.
>> 
>> I’m also guessing that you have a timestamp/watermark assigner where the watermark
is the highest-seen timestamp minus some lateness bound. I think the behaviour is not necessarily
an artefact of the Flink implementation (with maps and extractors being fused together) but
results from the graph itself and how watermarks are defined and how the extractor works:
in the first case, each stream (before the union) has its own watermark and the watermark
at Map3 is the minimum over those watermarks. This explains why a lower watermark on the one
stream holds back the watermark in total at Map3. In the second case, the two streams are
unioned together before extracting a timestamp/watermark and the choice of timestamp extractor
(which takes the highest-seen timestamp) means that the watermark now advances “faster”
because there is logically not a slower, separate stream anymore.
>> 
>> Is that analysis correct? Does my description roughly make sense?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. May 2017, at 15:00, Petr Novotnik <petr.novotnik@firma.seznam.cz>
wrote:
>>> 
>>> Hello Flinkers,
>>> 
>>> Given this small example program:
>>> 
>>>> https://pastebin.com/30JbbgpH
>>> 
>>> I'd expect the output:
>>> 
>>>> one|three
>>>> two|four
>>> 
>>> However, I consistently receive ...
>>> 
>>>> one
>>>> two|four
>>> 
>>> ... due to "three" being considered a late-comer which then gets
>>> discarded. When I remove `assignTimestampsAndWatermarks` after the
>>> `union` and place it separately on each of the union's inputs, i.e.
>>> before the `union`, I get what I expect.
>>> 
>>> Now, after digging through Flink's source code, this behavior actually
>>> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
>>> operators form one task). Though, from a user/api perspective, it is at
>>> least surprising.
>>> 
>>> I wanted to ask whether kind of behavior is known, indented or maybe
>>> something to be improved to avoid the gotcha?
>>> 
>>> Many thanks in advance,
>>> Pete.
>>> 
>> 
> 


Mime
View raw message