flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: streamin Table API - strange behavior
Date Thu, 14 Dec 2017 17:10:42 GMT
Hi,

yes you are right. I forgot that the interval is set by default when
enabling event time.

Also your comment about triggering the window is correct. Technically, you
don't need a record that falls into the next window, but just a watermark
that is past the window boundary.
In your case, watermarks only advance if the assigner sees more records and
you'd need a record with a timestamp of at least 2017-12-14 13:10:15 (or
16), because the watermark assigner subtracts 10 seconds.
Given the current watermark assigner, there is no other way than sending
more records to trigger a window computation. You can implement a custom
assigner to also emit watermarks without data, but that would somewhat bind
the event-time watermarks to the clock of the generating machine such that
watermarks wouldn't be only data-driven.

Best, Fabian

2017-12-14 17:25 GMT+01:00 Plamen Paskov <plamen.paskov@next-stream.com>:

> Hi Fabian,
>
> Thank you for your response! I think it's not necessary to do that because
> i have a call to anyway:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> which do exactly what you say. It set the watermark interval to 200ms .
> I think i found the problem and it is the default event-time trigger attached to the
assigner?.
> According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html
: "*all the event-time window assigners have an EventTimeTrigger as default trigger.
> This trigger simply fires once the watermark passes the end of a window.*" . All i have
to do in order to trigger the computation is to send an event which will fall in "next" window.
> So the question now is how can i set trigger to fire in regular intervals (e.g. every
5 seconds) using table API?
>
>
> On 14.12.2017 17:57, Fabian Hueske wrote:
>
> Hi,
>
> you are using a BoundedOutOfOrdernessTimestampExtractor to generate
> watermarks.
> The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
> assigner and only generates watermarks if a watermark interval is
> configured.
> Without watermarks, the query cannot "make progress" and only computes its
> result when the program is closed (sources emit a MAX_LONG watermark when
> being canceled).
>
> Long story short: you need to configure the watermark interval:
> env.getConfig.setAutoWatermarkInterval(100L);
>
> Best, Fabian
>
> 2017-12-14 16:30 GMT+01:00 Plamen Paskov <plamen.paskov@next-stream.com>:
>
>> Hi,
>>
>> I'm trying to run the following streaming program in my local flink 1.3.2
>> environment. The program compile and run without any errors but the print()
>> call doesn't display anything. Once i stop the program i receive all
>> aggregated data. Any ideas how to make it output regularly or when new data
>> come/old data updated?
>>
>> package flink;
>> import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import
org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import
org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import
org.apache.flink.table.api.java.Slide;import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import java.sql.Timestamp;
>>
>> public class StreamingJob {
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>>
>>
>>         SingleOutputStreamOperator<WC> input = env
>>                 .socketTextStream("localhost", 9000, "\n")
>>                 .map(new MapFunction<String, WC>() {
>>                     @Override                    public WC map(String value) throws
Exception {
>>                         String[] row = value.split(",");
>>                         Timestamp timestamp = Timestamp.valueOf(row[2]);
>>                         return new WC(row[0], Long.valueOf(row[1]), timestamp);
>>                     }
>>                 })
>>                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10))
{
>>                     @Override                    public long extractTimestamp(WC
element) {
>>                         return element.dt.getTime();
>>                     }
>>                 });
>>
>>
>>         tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");
>>
>>         Table table = tEnv.scan("WordCount")
>>                 .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
>>                 .groupBy("w, word")
>>                 .select("word, frequency.sum as frequency, w.start as dt");     
  DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
>>         result.print();
>>
>>         env.execute();
>>     }
>>
>>     public static class WC {
>>         public String word;
>>         public long frequency;
>>         public Timestamp dt;
>>
>>         public WC() {
>>         }
>>
>>         public WC(String word, long frequency, Timestamp dt) {
>>             this.word = word;
>>             this.frequency = frequency;
>>             this.dt = dt;
>>         }
>>
>>         @Override        public String toString() {
>>             return "WC " + word + " " + frequency + " " + dt.getTime();
>>         }
>>     }
>> }
>>
>>
>> Sample input:
>>
>> hello,1,2017-12-14 13:10:01
>> ciao,1,2017-12-14 13:10:02
>> hello,1,2017-12-14 13:10:03
>> hello,1,2017-12-14 13:10:04
>>
>>
>> Thanks
>>
>
>
>

Mime
View raw message