flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AndreaKinn <kinn6...@hotmail.it>
Subject Handle event time
Date Thu, 07 Sep 2017 20:24:28 GMT
I'm getting sensor data from a kafka source and I absolutely need they are
ordered on time data generation basis. I've implemented a custom
deserialiser and employed an AscendingTimestampExtractor to handle event
Obviously I set EventTime as streamTimeCharacteristics.
Unfortunately when I print the stream I see there are many records
unordered. Am I doing something wrong?
I've attached a prove of that:

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", KAFKA_ADDRESS);
		properties.setProperty("group.id", GROUP_ID);

		DataStream<Tuple6&lt;String, String, Date, String, String, Double>> stream
= env
				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String, String,
Double>>() {

			        public long extractAscendingTimestamp(Tuple6<String, String,
Date, String, String, Double> element) {
			            return element.f2.getTime();


Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message