flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Playing with EventTime in DataStreams
Date Fri, 26 Feb 2016 10:49:53 GMT
Hi,
I think the problem is that the source finished before the extractor has the chance to emit
even a single watermark. This means that the topology will shut down and the window operator
does not emit in-flight windows upon shutdown.

Cheers,
Aljoscha
> On 26 Feb 2016, at 11:40, Nam-Luc Tran <namluc.tran@euranova.eu> wrote:
> 
> Great, that did it, thanks Robert ;)
> 
> While I'm at it:
> Sometimes results are correctly returned, sometimes, the output of the job
> (print or writeAsText)  is plain empty, like the job finished too quickly
> before the results are written. One way of "forcing" results to happen is
> to insert a "delay" in the source stream, as with a FlatMap:
> 
>      @Override
>      public void flatMap(String value, Collector<String> out)
>            throws Exception {
>         Thread.sleep(1);
>         out.collect(value.toLowerCase());
>         }
> 
> Am I missing anything here?
> 
> Best regards,
> 
> 
> 2016-02-25 20:05 GMT+01:00 Robert Metzger <rmetzger@apache.org>:
> 
>> Hi,
>> 
>> I had a similar issue recently.
>> Instead of
>> input.assignTimestampsAndWatermarks
>> 
>> you have to do:
>> 
>> input = input.assignTimestampsAndWatermarks
>> 
>> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.tran@euranova.eu>
>> wrote:
>> 
>>> Hello everyone,
>>> 
>>> I am currently playing with streams which timestamp is defined by
>>> EventTime. I currently have the following code:
>>> 
>>>      final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
>>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>>      DataStream<String> input =
>>> env.readTextFile("file:///var/log/syslog");
>>>      input.assignTimestampsAndWatermarks(new
>>> AssignTimestampFromLogEvent());
>>> 
>>>      input.timeWindowAll(Time.minutes(5)).apply(new
>>> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
>>>         @Override
>>>         public void apply(TimeWindow window, Iterable<String> values,
>>> Collector<String> out) throws Exception {
>>>            for(String t:values) {
>>>               out.collect(t);
>>>            }
>>>         }
>>>      }).print();
>>> 
>>> (...)
>>> 
>>> public static final class AssignTimestampFromLogEvent extends
>>> AscendingTimestampExtractor<String> {
>>>   @Override
>>>   public long extractAscendingTimestamp(String element, long
>>> previousElementTimestamp){
>>>      String date = element.substring(0,15);
>>>      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
>>>      Date ddate = null;
>>>      try {
>>>         ddate = sdf.parse(date);
>>>      } catch (ParseException e) {
>>>         e.printStackTrace();
>>>      }
>>>      return ddate.getTime();
>>>   }
>>> }
>>> 
>>> 
>>> What I expect it to do is to read the syslog, assign timestamp and do
>>> 5 minutes windows *based on the syslog event time*, as I've configured
>>> the stream to do it. It however does not do that, and does the windows
>>> based on processing time.
>>> 
>>> What am I missing here?
>>> 
>>> Best regards,
>>> 
>>> --
>>> 
>>> *Nam-Luc TRAN*
>>> 
>>> R&D Manager
>>> 
>>> EURA NOVA
>>> 
>>> (M) +32 498 37 36 23
>>> 
>>> *euranova.eu <http://euranova.eu>*
>>> 
>> 
> 
> 
> 
> -- 
> 
> *Nam-Luc TRAN*
> 
> R&D Manager
> 
> EURA NOVA
> 
> (M) +32 498 37 36 23
> 
> *euranova.eu <http://euranova.eu>*


Mime
View raw message