flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chen <eric__...@126.com>
Subject Re: flink eventTime, lateness, maxoutoforderness
Date Sat, 23 Dec 2017 04:11:09 GMT

CODE with maxOutOfOrdernesstime effect:
        dataStream.keyBy(row -> (String)row.getField(0))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(initRow(), new FoldFunction<Row, Row>() {
                    @Override
                    public Row fold(Row ret, Row o) throws Exception {
                        ret.setField(0, (int)ret.getField(0) + 1);
                        ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
                        return  ret;
                    }
                });
         public Watermark getCurrentWatermark(){
                  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
        1,1483250636000|
        4,1483250640000|1483250642000|1483250641000|1483250643000|
        4,1483250649000|1483250648000|1483250645000|1483250647000|
        2,1483250650000|1483250653000|
        1,1483250658000|
        3,1483250661000|1483250662000|1483250663000|
        1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
        1,1483250636000|
        2,1483250640000|1483250642000|
        3,1483250649000|1483250648000|1483250645000|
        2,1483250650000|1483250653000|
        1,1483250658000|
        3,1483250661000|1483250662000|1483250663000|
        1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message