flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Knauf <konstantin.kn...@tngtech.com>
Subject Custom TimestampExtractor and FlinkKafkaConsumer082
Date Sun, 15 Nov 2015 21:55:51 GMT
Hi everyone,

I have the following issue with Flink (0.10) and Kafka.

I am using a very simple TimestampExtractor like [1], which just
extracts a millis timestamp from a POJO. In my streaming job, I read in
these POJOs from Kafka using the FlinkKafkaConsumer082 like this:

stream = env.addSource(new FlinkKafkaConsumer082<
(parameterTool.getRequired("topic"),
                new AvroPojoDeserializationSchema(),
parameterTool.getProperties()))

I have timestampEnabled() and the TimeCharacteristics are EventTime,
AutoWatermarkIntervall is 500.

The problem is, when I do something like:

stream.assignTimestamps(new PojoTimestampExtractor(6000))
.timeWindowAll(Time.of(1, TimeUnit.SECONDS)
.sum(..)
.print()

env.execute();

the windows never get triggered.

If I use ProcessingTime it works.
If I use env.fromCollection(...) instead of the KafkaSource it works
with EventTime, too.

Any ideas what I could be doing wrong are highly appreciated.

Cheers,

Konstantin

[1]:

public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {

    final private long maxDelay;

    public  PojoTimestampExtractor(long maxDelay) {
        this.maxDelay = maxDelay;
    }

    @Override
    public long extractTimestamp(Pojo fightEvent, long l) {
        return pojo.getTime();
    }

    @Override
    public long extractWatermark(Pojo pojo, long l) {
        return pojo.getTime() - maxDelay;
    }

    @Override
    public long getCurrentWatermark() {
        return Long.MIN_VALUE;
    }

Mime
View raw message