flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Queries regarding FlinkCEP
Date Fri, 02 Jun 2017 11:10:53 GMT
Hi ,

Thanks a lot for the help last time, I have a few more questions and I chose
to create a new topic as the problem in the previous topic was solved,
thanks to useful inputs from Flink Community. The questions are as follows

*1.* What time does the "within" operator works on "Event Time" or
"Processing Time", I am asking this as I wanted to know whether something
like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is specified for
5 mins. So if a first events event time is at 1:00  and the corresponding
next event is has an event time of 1:04 but it arrives in the system at
1:06. Would this still be processed and alert would be generated or not? 

*2.* What would happen if I don't have a key to specify, the way 2 events
are correlated is by using the ctx of the first event and matching some
different id. So, we can't group by some unique field. I tried a test run
without specifying a key and it apparently works. But how is the shuffling
done then in this case?

*3.* This is one of the major issue, So I could use Event Time with
ascending event time extractor for one of my kafka topic because its
behavior is consistent.  But when i added another topic to read from where
the events are not in ascending order, using ascending timestampextractor
gave me timestamp monotonicity violation. Then when I am using
BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any
warnings anymore but I am no more getting my alerts. 

If I go back to using processing time, then I am again getting alerts
properly. What could be the problem here?

*This is the code I am using:*

/public class CEPForBAM {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =

// configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
            Arrays.asList("topic1", "topic_x", "topic_test"),
            new StringSerializerToEvent(),

BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();

    DataStream<BAMEvent> events = env.addSource(kafkaSource);

    // Input stream of monitoring events

/*    DataStream<BAMEvent> partitionedInput = events
            .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/


    Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID =

              public boolean filter(BAMEvent event) throws Exception {
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID =

              public boolean filter(BAMEvent secondEvent, Context<BAMEvent>
ctx) throws Exception {

                  for (BAMEvent firstEvent :
ctx.getEventsForPattern("first")) {
                      return true;
                return false;

    PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);

    DataStream<Either&lt;String, String>> alerts = patternStream.select(new
PatternTimeoutFunction<BAMEvent, String>() {
      private static final long serialVersionUID = -8717561187522704500L;

      public String timeout(Map<String, List&lt;BAMEvent>> map, long l)
throws Exception {
        return "TimedOut: " + map.toString() + " @ " + l;

    }, new PatternSelectFunction<BAMEvent, String>() {
      private static final long serialVersionUID = 3144439966791408980L;

      public String select(Map<String, List&lt;BAMEvent>> pattern) throws
Exception {
        BAMEvent bamEvent = pattern.get("first").get(0);
        return "Matched Events: " + bamEvent.getEventId() + "_" +


    env.execute("CEP monitoring job");

Even when I am using Event Time, I am getting events from kafka as can be
shown from event.print()

View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

View raw message