flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tao xiao <xiaotao...@gmail.com>
Subject Kafka topic partition skewness causes watermark not being emitted
Date Thu, 12 Jan 2017 16:28:25 GMT
Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition
0 and no data to partition 1. I created a Flink job with parallelism to 1
that consumes that topic and count the events with session event window (5
seconds gap). It turned out that the session event window was never closed
even I sent a message with 10 minutes gap. After digging into the source
code, AbstractFetcher[1] that is responsible for sending watermark to
downstream calculates the min watermark of all partitions. Due to the fact
that we don't have data in partition 1, the watermark returned from
partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires
the watermark to downstream.

I want to know if this is expected behavior or a bug. If this is expected
behavior how do I avoid the delay of watermark firing when data is not
evenly distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements
AssignerWithPeriodicWatermarks<SessionEvent> {

private long currentMaxTimestamp = Long.MIN_VALUE;

public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE
: currentMaxTimestamp - 1);

public long extractTimestamp(SessionEvent element, long
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;

return eventStartTime;

and this is the Flink topo

// get input data
FlinkKafkaConsumer010<SessionEvent> consumer = new
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream<SessionEvent> input = env.addSource(consumer);

reduce(new Reducer(), new WindowFunction()).

//        // execute program
env.execute("a job");

I used the latest code in github


View raw message