flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Plamen Paskov <plamen.pas...@next-stream.com>
Subject Re: streamin Table API - strange behavior
Date Thu, 14 Dec 2017 16:25:30 GMT
Hi Fabian,

Thank you for your response! I think it's not necessary to do that 
because i have a call to anyway:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html
: "*all the event-time window assigners have an EventTimeTrigger as default 
trigger. This trigger simply fires once the watermark passes the end of 
a window.*" . All i have to do in order to trigger the computation is to send an event which
will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds)
using table API?


On 14.12.2017 17:57, Fabian Hueske wrote:
> Hi,
>
> you are using a BoundedOutOfOrdernessTimestampExtractor to generate 
> watermarks.
> The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark 
> assigner and only generates watermarks if a watermark interval is 
> configured.
> Without watermarks, the query cannot "make progress" and only computes 
> its result when the program is closed (sources emit a MAX_LONG 
> watermark when being canceled).
>
> Long story short: you need to configure the watermark interval: 
> env.getConfig.setAutoWatermarkInterval(100L);
>
> Best, Fabian
>
> 2017-12-14 16:30 GMT+01:00 Plamen Paskov 
> <plamen.paskov@next-stream.com <mailto:plamen.paskov@next-stream.com>>:
>
>     Hi,
>
>     I'm trying to run the following streaming program in my local
>     flink 1.3.2 environment. The program compile and run without any
>     errors but the print() call doesn't display anything. Once i stop
>     the program i receive all aggregated data. Any ideas how to make
>     it output regularly or when new data come/old data updated?
>
>     package flink;
>
>     import org.apache.flink.api.common.functions.MapFunction;
>     import org.apache.flink.api.java.tuple.Tuple2;
>     import org.apache.flink.streaming.api.TimeCharacteristic;
>     import org.apache.flink.streaming.api.datastream.DataStream;
>     import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>     import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>     import org.apache.flink.streaming.api.windowing.time.Time;
>     import org.apache.flink.table.api.Table;
>     import org.apache.flink.table.api.java.Slide;
>     import org.apache.flink.table.api.java.StreamTableEnvironment;
>
>     import java.sql.Timestamp;
>
>
>     public class StreamingJob {
>          public static void main(String[] args)throws Exception {
>              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>              StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>
>
>              SingleOutputStreamOperator<WC> input = env
>                      .socketTextStream("localhost",9000,"\n")
>                      .map(new MapFunction<String, WC>() {
>                          @Override public WC map(String value)throws Exception {
>                              String[] row = value.split(",");
>                              Timestamp timestamp = Timestamp.valueOf(row[2]);
>                              return new WC(row[0], Long.valueOf(row[1]), timestamp);
>                          }
>                      })
>                      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10))
{
>                          @Override public long extractTimestamp(WC element) {
>                              return element.dt.getTime();
>                          }
>                      });
>
>
>              tEnv.registerDataStream("WordCount", input,"word, frequency, dt.rowtime");
>
>              Table table = tEnv.scan("WordCount")
>                      .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
>                      .groupBy("w, word")
>                      .select("word, frequency.sum as frequency, w.start as dt");DataStream<Tuple2<Boolean,
WC>> result = tEnv.toRetractStream(table, WC.class);
>              result.print();
>
>              env.execute();
>          }
>
>          public static class WC {
>              public Stringword;
>              public long frequency;
>              public Timestampdt;
>
>              public WC() {
>              }
>
>              public WC(String word,long frequency, Timestamp dt) {
>                  this.word = word;
>                  this.frequency = frequency;
>                  this.dt = dt;
>              }
>
>              @Override public String toString() {
>                  return "WC " +word +" " +frequency +" " +dt.getTime();
>              }
>          }
>     }
>
>
>     Sample input:
>
>     hello,1,2017-12-14 13:10:01
>     ciao,1,2017-12-14 13:10:02
>     hello,1,2017-12-14 13:10:03
>     hello,1,2017-12-14 13:10:04
>
>
>     Thanks
>
>


Mime
View raw message