flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ajay (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-7782) Flink CEP not recognizing pattern
Date Mon, 09 Oct 2017 14:05:00 GMT
Ajay created FLINK-7782:
---------------------------

             Summary: Flink CEP not recognizing pattern
                 Key: FLINK-7782
                 URL: https://issues.apache.org/jira/browse/FLINK-7782
             Project: Flink
          Issue Type: Bug
            Reporter: Ajay


I am using flink version 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink dashboard,
I see that I have 2 Taskmanagers & 4 Task slots

What I observe is the following. The input to Kafka is a json string and when parsed on the
flink side, it looks like this


{code:java}
(101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
{code}

I use a Tuple8 to capture the parsed data. The first field is home_id. The time characteristic
is set to EventTime and I have an AscendingTimestampExtractor using the timestamp field. I
have parallelism for the execution environment is set to 4. I have a rather simple event that
I am trying to capture


{code:java}
DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId
= cepMap.keyBy(0);

            //cepMapByHomeId.print();

            Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?>
cep1 =
                            Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
                                            .where(new OverLowThreshold())
                                            .followedBy("end")
                                            .where(new OverHighThreshold());


            PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float,
Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);


            DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>>
alerts = patternStream.select(new PackageCapturedEvents());


{code}
The pattern checks if the 7th field in the tuple8 goes over 12 and then over 16. The output
of the pattern is like this


{code:java}
(201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
{code}



On the Kafka producer side, I am trying send simulated data for around 100 homes, so the home_id
would go from 0-100 and the input is keyed by home_id. I have about 10 partitions in kafka.
The producer just loops going through a csv file with a delay of about 100 ms between 2 rows
of the csv file. The data is exactly the same for all 100 of the csv files except for home_id
and the lat & long information. The timestamp is incremented by a step of 1 sec. I start
multiple processes to simulate data form different homes.

THE PROBLEM:

Flink completely misses capturing events for a large subset of the input data. I barely see
the events for about 4-5 of the home_id values. I do a print before applying the pattern and
after and I see all home_ids before and only a tiny subset after. Since the data is exactly
the same, I expect all homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message