flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jayesh Patel <jpa...@keywcorp.com>
Subject RE: assignTimestampsAndWatermarks not working as expected
Date Thu, 04 May 2017 16:45:01 GMT
I figured out what's wrong - there was a silly mistake on my side.  There is
nothing wrong with the code  here, but please do let me know if you see
anything wrong with my approach.

 

Thank you.

 

From: Jayesh Patel 
Sent: Thursday, May 04, 2017 10:00 AM
To: 'user@flink.apache.org' <user@flink.apache.org>
Subject: assignTimestampsAndWatermarks not working as expected

 

Can anybody see what's wrong with the following code?  I am using Flink 1.2
and have tried running it in Eclipse (local mode) as well as on a 3 node
cluster and it's not behaving as expected.

 

The idea is to have a custom source collect messages from a JMS topic (I
have a fake source for now that generates some out of order messages with
event time that is not delayed more than 5 seconds).  The source doesn't
collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or
out of order messages I use assignTimestampsAndWatermarks with
BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method
retrieves the event time from the event.

 

When I run this job, I don't get the printout from the extractTimestamp()
method, nor do I get the logTuples.print() or stampedLogs.print() output.
When running on the local environment(Eclipse) I do see the printouts from
the fake source (MockSource - not shown here).  But I don't even get those
when run from my 3 node cluster with parallelism of 3.

 

public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

       env.getConfig().setAutoWatermarkInterval(2000); // just for
debugging, didn't affect the behavior

 

       DataStream<Message> logs = env.addSource(new MockSource());

       DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new
ParseEvent());

       logTuples.print();

 

 

       DataStream<Tuple2<String, CEFEvent>> stampedLogs =
logTuples.assignTimestampsAndWatermarks(

new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.second
s(5)) {

                     private static final long serialVersionUID = 1L;

                     @Override

                     public long extractTimestamp(Tuple2<String,CEFEvent>
element) {

                            // This is how to extract timestamp from the
event

                           long eventTime =
element.f1.getEventStartTime().toInstant().toEpochMilli();

                           System.out.println("returning event time " +
eventTime);

                           return eventTime;

                     }});

       stampedLogs.print();

       env.execute("simulation");

}

 

Thank you,

Jayesh


Mime
View raw message