flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sam Huang <sam.hu...@reflektion.com>
Subject Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10
Date Tue, 28 Feb 2017 23:13:52 GMT
Hi,

I'm using *Flink* *1.2.0* to read from *Kafka*-0.8.1.1_2.10.

I have written a *flink* streaming job that creates (event) time based
window and then computes some stats. However, the window function is never
called. I used the debug watermark code and noticed that no watermark is
generated. If I read from file, then only one watermark is generated. Here
is my code (reading from *kafka*)-

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);

    // Read from kafka (it works as the following print statement works)
    DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
    // jsonEventStream.print();

    jsonEventStream
        .flatMap(new strToTupleFlatMapFunImpl())
        .assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner())
        .flatMap(new jsonToTupleListFlatMapFunImpl())
        .keyBy(0, 1, 2)
        .timeWindow(Time.seconds(60))
        .allowedLateness(Time.seconds(10))
        .reduce(new ReduceFunImpl(), new WindowFunImpl())  // reduce
fun gets called but not window fun
        .addSink(new InfluxDBSink(INFLUXDB_DB));

    env.execute();
}

private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>> getRawJsonTimestampsAndWatermarksAssigner() {
    return new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>>(Time.seconds(WINDOW_LATENESS)) {
        @Override
        public long extractTimestamp(Tuple2<String, Long> tuple) {
            return tuple.f1;
        }
    };
}


public static StreamExecutionEnvironment
createExecutionEnvironment(String[] args) throws IOException {
    ParameterTool params = ParameterTool.fromArgs(args);
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setGlobalJobParameters(params);
    //env.getConfig().setAutoWatermarkInterval(1000);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    return env;
}


Any help will be appreciated.

Thank you,

Sam

Mime
View raw message