flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Plamen Paskov <plamen.pas...@next-stream.com>
Subject streamin Table API - strange behavior
Date Thu, 14 Dec 2017 15:30:06 GMT
Hi,

I'm trying to run the following streaming program in my local flink 
1.3.2 environment. The program compile and run without any errors but 
the print() call doesn't display anything. Once i stop the program i 
receive all aggregated data. Any ideas how to make it output regularly 
or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
     public static void main(String[] args)throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
         StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


         SingleOutputStreamOperator<WC> input = env
                 .socketTextStream("localhost",9000,"\n")
                 .map(new MapFunction<String, WC>() {
                     @Override public WC map(String value)throws Exception {
                         String[] row = value.split(",");
                         Timestamp timestamp = Timestamp.valueOf(row[2]);
                         return new WC(row[0], Long.valueOf(row[1]), timestamp);
                     }
                 })
                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10))
{
                     @Override public long extractTimestamp(WC element) {
                         return element.dt.getTime();
                     }
                 });


         tEnv.registerDataStream("WordCount", input,"word, frequency, dt.rowtime");

         Table table = tEnv.scan("WordCount")
                 .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                 .groupBy("w, word")
                 .select("word, frequency.sum as frequency, w.start as dt");DataStream<Tuple2<Boolean,
WC>> result = tEnv.toRetractStream(table, WC.class);
         result.print();

         env.execute();
     }

     public static class WC {
         public Stringword;
         public long frequency;
         public Timestampdt;

         public WC() {
         }

         public WC(String word,long frequency, Timestamp dt) {
             this.word = word;
             this.frequency = frequency;
             this.dt = dt;
         }

         @Override public String toString() {
             return "WC " +word +" " +frequency +" " +dt.getTime();
         }
     }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks


Mime
View raw message