you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);

        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();

        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);


    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;

        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();

Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04