flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AndreaKinn <kinn6...@hotmail.it>
Subject Handle event time
Date Thu, 07 Sep 2017 20:24:28 GMT
Hi,
I'm getting sensor data from a kafka source and I absolutely need they are
ordered on time data generation basis. I've implemented a custom
deserialiser and employed an AscendingTimestampExtractor to handle event
time.
Obviously I set EventTime as streamTimeCharacteristics.
Unfortunately when I print the stream I see there are many records
unordered. Am I doing something wrong?
I've attached a prove of that:

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.enableCheckpointing(CHECKPOINT_TIME); 
		env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", KAFKA_ADDRESS);
		properties.setProperty("group.id", GROUP_ID);

		DataStream<Tuple6&lt;String, String, Date, String, String, Double>> stream
= env
				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
				.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String, String,
Double>>() {

			        @Override
			        public long extractAscendingTimestamp(Tuple6<String, String,
Date, String, String, Double> element) {
			            return element.f2.getTime();
			        }
				})
				.keyBy(0);
		
stream.print()*

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message