flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sendoh <unicorn.bana...@gmail.com>
Subject Last event in event time window is not output
Date Wed, 24 May 2017 17:00:29 GMT
Hi Flink users,

We have a unit test to test event time window aggregation, but when the job
finishes, the last event is not output because the Flink job finishes before
the watermark proceeds, as there is no next event.

Does anyone have similar issue and have a solution?

The code is like:
env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
                TestData.events("2017-05-20T20:34:17.097Z", "998"),
                TestData.events("2017-05-20T20:38:17.097Z", "999"));


DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
EventWatermark())
                .keyBy(new KeyByID())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
                .allowedLateness(Time.minutes(Long.MAX_VALUE))
                .fold(null, new AggFoldFunction());

        Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);

        int count = 0;
        while (javaObj.hasNext()) {
            JsonNode current = javaObj.next();
            System.out.println(current);
            count++;
        }
        Assert.assertEquals(3, count);

The watermark is simply as:
public class EventWatermark implements
AssignerWithPeriodicWatermarks<JsonNode> {

    private final long maxTimeLag = 5000;

    private long currentMaxTimestamp;
    public transient static DateTimeFormatter parseFromTimeFormatter =
ISODateTimeFormat.dateTimeParser();

    @Override
    public long extractTimestamp(JsonNode element, long
previousElementTimestamp) {
        long occurredAtLong;
        try {
            occurredAtLong =
DateTime.parse(element.get("metadata").get("occurred_at").asText(),
parseFromTimeFormatter).getMillis();
        }
        catch(IllegalArgumentException ie) {
            throw new IllegalArgumentException(element.asText());
        }

        if(occurredAtLong > currentMaxTimestamp){
            currentMaxTimestamp = occurredAtLong;
            }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message