flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xingcan Cui <xingc...@gmail.com>
Subject Re: Handle event time
Date Fri, 08 Sep 2017 01:29:56 GMT
Hi AndreaKinn,

The AscendingTimestampExtractor do not work as you think. It should be
applied for streams where timestamps are
monotonously ascending, naturally.

Flink uses watermark to deal with unordered data. When a watermark *t* is
received, it means there should be no more
records whose timestamps are less than or equal to *t*. However, you must
implement your own watermark generation
policy. There are two basic watermark
assigners: AssignerWithPeriodicWatermarks for generating watermarks
periodically
and  AssignerWithPunctuatedWatermarks for generating watermarks when
encountered certain records.

For more information, please refer to [1] and [2].

Best,
Xingcan

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html

On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn <kinn6aer@hotmail.it> wrote:

> Hi,
> I'm getting sensor data from a kafka source and I absolutely need they are
> ordered on time data generation basis. I've implemented a custom
> deserialiser and employed an AscendingTimestampExtractor to handle event
> time.
> Obviously I set EventTime as streamTimeCharacteristics.
> Unfortunately when I print the stream I see there are many records
> unordered. Am I doing something wrong?
> I've attached a prove of that:
>
> *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>                 env.enableCheckpointing(CHECKPOINT_TIME);
>                 env.setParallelism(1);
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
>
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> KAFKA_ADDRESS);
>                 properties.setProperty("group.id", GROUP_ID);
>
>                 DataStream<Tuple6&lt;String, String, Date, String, String,
> Double>> stream
> = env
>                                 .addSource(new
> FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
>                                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String,
> String,
> Double>>() {
>
>                                 @Override
>                                 public long extractAscendingTimestamp(Tuple6<String,
> String,
> Date, String, String, Double> element) {
>                                     return element.f2.getTime();
>                                 }
>                                 })
>                                 .keyBy(0);
>
> stream.print()*
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Mime
View raw message