flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Issue with CEP library
Date Thu, 28 Sep 2017 08:38:33 GMT
Hi Ajay,

I will look a bit more on the issue.

But in the meantime, could you run your job with parallelism of 1, to see if the results are
the expected?

Also could you change the pattern, for example check only for the start, to see if all keys
pass through.

As for the code, you apply keyBy(0) the cepMap stream twice, which is redundant and introduces
You could remove that to also see the impact.


> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykrishna@gmail.com> wrote:
> Hi, 
> I've been only working with flink for the past 2 weeks on a project and am trying using
the CEP library on sensor data. I am using flink version 1.3.2. Flink has a kafka source.
I am using KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM running
Ubuntu 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when parsed
on the flink side, it looks like this
> (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0)
> I use a Tuple8 to capture the parsed data. The first field is home_id. The time characteristic
is set to EventTime and I have an AscendingTimestampExtractor using the timestamp field. I
have parallelism for the execution environment is set to 4. I have a rather simple event that
I am trying to capture
> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> cepMapByHomeId
= cepMap.keyBy(0);
>             //cepMapByHomeId.print();
>             Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>,
?> cep1 =
>                             Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>                                             .where(new OverLowThreshold())
>                                             .followedBy("end")
>                                             .where(new OverHighThreshold());
>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, Float,
Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, Float>>
alerts = patternStream.select(new PackageCapturedEvents());
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 16. The
output of the pattern is like this
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467)
> On the Kafka producer side, I am trying send simulated data for around 100 homes, so
the home_id would go from 0-100 and the input is keyed by home_id. I have about 10 partitions
in kafka. The producer just loops going through a csv file with a delay of about 100 ms between
2 rows of the csv file. The data is exactly the same for all 100 of the csv files except for
home_id and the lat & long information. The timestamp is incremented by a step of 1 sec.
I start multiple processes to simulate data form different homes.
> Flink completely misses capturing events for a large subset of the input data. I barely
see the events for about 4-5 of the home_id values. I do a print before applying the pattern
and after and I see all home_ids before and only a tiny subset after. Since the data is exactly
the same, I expect all homeid to be captured and written to my sink which is cassandra in
this case. I've looked through all available docs and examples but cannot seem to get a fix
for the problem.
> I would really appreciate some guidance how to understand fix this.
> Thank you,
> Ajay

View raw message