flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nam-Luc Tran <namluc.t...@euranova.eu>
Subject Re: Playing with EventTime in DataStreams
Date Fri, 26 Feb 2016 10:40:59 GMT
Great, that did it, thanks Robert ;)

While I'm at it:
Sometimes results are correctly returned, sometimes, the output of the job
(print or writeAsText)  is plain empty, like the job finished too quickly
before the results are written. One way of "forcing" results to happen is
to insert a "delay" in the source stream, as with a FlatMap:

      @Override
      public void flatMap(String value, Collector<String> out)
            throws Exception {
         Thread.sleep(1);
         out.collect(value.toLowerCase());
         }

Am I missing anything here?

Best regards,


2016-02-25 20:05 GMT+01:00 Robert Metzger <rmetzger@apache.org>:

> Hi,
>
> I had a similar issue recently.
> Instead of
>  input.assignTimestampsAndWatermarks
>
> you have to do:
>
>  input = input.assignTimestampsAndWatermarks
>
> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.tran@euranova.eu>
> wrote:
>
> > Hello everyone,
> >
> > I am currently playing with streams which timestamp is defined by
> > EventTime. I currently have the following code:
> >
> >       final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
> >       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >       DataStream<String> input =
> > env.readTextFile("file:///var/log/syslog");
> >       input.assignTimestampsAndWatermarks(new
> > AssignTimestampFromLogEvent());
> >
> >       input.timeWindowAll(Time.minutes(5)).apply(new
> > AllWindowFunction<Iterable<String>, String, TimeWindow>() {
> >          @Override
> >          public void apply(TimeWindow window, Iterable<String> values,
> > Collector<String> out) throws Exception {
> >             for(String t:values) {
> >                out.collect(t);
> >             }
> >          }
> >       }).print();
> >
> > (...)
> >
> > public static final class AssignTimestampFromLogEvent extends
> > AscendingTimestampExtractor<String> {
> >    @Override
> >    public long extractAscendingTimestamp(String element, long
> > previousElementTimestamp){
> >       String date = element.substring(0,15);
> >       SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
> >       Date ddate = null;
> >       try {
> >          ddate = sdf.parse(date);
> >       } catch (ParseException e) {
> >          e.printStackTrace();
> >       }
> >       return ddate.getTime();
> >    }
> > }
> >
> >
> > What I expect it to do is to read the syslog, assign timestamp and do
> > 5 minutes windows *based on the syslog event time*, as I've configured
> > the stream to do it. It however does not do that, and does the windows
> > based on processing time.
> >
> > What am I missing here?
> >
> > Best regards,
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
> >
>



-- 

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message