flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biplob Biswas <revolutioni...@gmail.com>
Subject No Alerts with FinkCEP
Date Fri, 26 May 2017 12:25:55 GMT

I just started exploring Flink CEP a day back and I thought I can use it to
make a simple event processor. For that I looked into the CEP examples by
Till and some other articles. 

Now I have 2 questions which i would like to ask:

*Part 1:*

I came up with the following piece of code, but this is not working as

///**************** Main ******************///

FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
            new SimpleStringSchema(),

    DataStream<String> input = env.addSource(consumer);
    LOG.info("About to process events");
    DataStream<ReadEventType> events =
                    //.map(s -> s.f1)
                    .map(new MapStringToRRE())


    DataStream<ReadEventType> partitionedInput = events
            .keyBy((KeySelector<ReadEventType, String>) value ->

    Pattern<ReadEventType, ?> pattern =
            .where(event -> event.getFormat() == FormatType.FILE)
            .where(event -> event.getFormat() == FormatType.FILE)

    PatternStream<ReadEventType> patternStream =
CEP.pattern(partitionedInput, pattern);

    DataStream<String> alerts =
patternStream.select((PatternSelectFunction<ReadEventType, String>)


    env.execute("CEP monitoring job");

///*********** Alert Function returning just concat of txn id

  private static String createAlert(Map<String, ReadEventType> pattern) {
    return pattern.get("first").getTransactionItem().getUid() + " " +

///******************* properties for kafka **************///

  private static Properties getDefaultProperties(Properties prop){
    prop.put("group.id", "FlinkCEP");
    prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
    prop.put("zookeeper.connect", ZKEEPER);
    prop.put("auto.offset.reset", "earliest");
    return prop;

As my kafka topic only sends me events with formattype = FILE, I was
expecting to see multiple alerts being raised. But thats not the case, i am
not getting any alert at the moment.

Can anyone point out what am I doing wrong? 

PART 2: 

Also, my main aim for using CEP is to read from different topics and raise
alert if a second event is *not* followed by a first event within a given
time interval. How can I achieve it with FlinkCEP? for now I can only see
that if 2 events follow within a time interval an alert should be raised. 

Thanks & Regards,

View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

View raw message