flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject Queries regarding FlinkCEP
Date Fri, 02 Jun 2017 11:10:53 GMT
Hi ,

Thanks a lot for the help last time, I have a few more questions and I chose
to create a new topic as the problem in the previous topic was solved,
thanks to useful inputs from Flink Community. The questions are as follows

*1.* What time does the "within" operator works on "Event Time" or
"Processing Time", I am asking this as I wanted to know whether something
like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is specified for
5 mins. So if a first events event time is at 1:00  and the corresponding
next event is has an event time of 1:04 but it arrives in the system at
1:06. Would this still be processed and alert would be generated or not? 

*2.* What would happen if I don't have a key to specify, the way 2 events
are correlated is by using the ctx of the first event and matching some
different id. So, we can't group by some unique field. I tried a test run
without specifying a key and it apparently works. But how is the shuffling
done then in this case?

*3.* This is one of the major issue, So I could use Event Time with
ascending event time extractor for one of my kafka topic because its
behavior is consistent.  But when i added another topic to read from where
the events are not in ascending order, using ascending timestampextractor
gave me timestamp monotonicity violation. Then when I am using
BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any
warnings anymore but I am no more getting my alerts. 

If I go back to using processing time, then I am again getting alerts
properly. What could be the problem here?

*This is the code I am using:*

/public class CEPForBAM {


  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    System.out.println(env.getStreamTimeCharacteristic());
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10000);

// configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
FlinkKafkaConsumer010<>(
            Arrays.asList("topic1", "topic_x", "topic_test"),
            new StringSerializerToEvent(),
            props);

    kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);

    // Input stream of monitoring events


/*    DataStream<BAMEvent> partitionedInput = events
            .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/

     evetns.print();
    //partitionedInput.print();

    Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID =
1390448281048961616L;

              @Override
              public boolean filter(BAMEvent event) throws Exception {
                return
event.getEventName().equals(ReadEventType.class.getSimpleName());
              }
            })
            .followedBy("second")
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID =
-9216505110246259082L;

              @Override
              public boolean filter(BAMEvent secondEvent, Context<BAMEvent>
ctx) throws Exception {

                if
(secondEvent.getEventName().equals(StatusChangedEventType.class.getSimpleName()))
{
                  for (BAMEvent firstEvent :
ctx.getEventsForPattern("first")) {
                    if
(secondEvent.getCorrelationID().contains(firstEvent.getEventId()))
                      return true;
                  }
                }
                return false;
              }
            })
            .within(Time.minutes(10));

    PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);


    DataStream<Either&lt;String, String>> alerts = patternStream.select(new
PatternTimeoutFunction<BAMEvent, String>() {
      private static final long serialVersionUID = -8717561187522704500L;

      @Override
      public String timeout(Map<String, List&lt;BAMEvent>> map, long l)
throws Exception {
        return "TimedOut: " + map.toString() + " @ " + l;
      }

    }, new PatternSelectFunction<BAMEvent, String>() {
      private static final long serialVersionUID = 3144439966791408980L;

      @Override
      public String select(Map<String, List&lt;BAMEvent>> pattern) throws
Exception {
        BAMEvent bamEvent = pattern.get("first").get(0);
        return "Matched Events: " + bamEvent.getEventId() + "_" +
bamEvent.getEventName();
      }
    });

    alerts.print();

    env.execute("CEP monitoring job");
  }
}/


Even when I am using Event Time, I am getting events from kafka as can be
shown from event.print()



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message