flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Strategies for Complex Event Processing with guaranteed data consistency
Date Fri, 13 Jan 2017 21:02:39 GMT
Hi Kat,

I did not understand the difference between a case and a trace.
If I got it right, the goal of your first job is to assemble the individual
events into cases. Is a case here the last event for a case-id or all
events of a case-id?
If a case is the collection of all events (which I assume) what is the
difference to a trace which is also the list of events (if I got it right)?

In any case, I think your first job can also be solved without a session
window (which is quite complex internally).
There are two options:
1) use a global window [1] with a custom trigger that triggers for each
arriving record. A global window does never end, which would be OK since
your cases do not end as well.
2) use a MapFunction with key-partitioned operator state [2]. The map
function would simply update the state for every new event and emit a new

Regarding your concerns of losing data when writing to Kafka. Flink's
KafkaProducer provides at-least-once guarantees, which means that data
might be written more than once in case of a failure but won't be lost. If
the Kafka topic is partitioned by case-id and you only need the last record
per case-id, Kafka's log compaction should give you upsert semantics.

Regarding your question "Is using state in this way a somewhat standard
practice, or is state intended more for recovery?":
Many streaming applications require state for their semantics (just like
yours), i.e., they need to buffer data and wait for more data to arrive. In
order to guarantee consistent result semantics of an application, the state
must not be lost and be recovered in case of a failure. So state is not
intended for recovery, but recovery is needed to guarantee application

As I said before, I did not get the difference between cases and trace, so
I cannot really comment on the job to analyze traces.

Hope this helps,


2017-01-13 11:04 GMT+01:00 Kathleen Sharp <kathleen.sharp@signavio.com>:

> I have been playing around with Flink for a few weeks to try to
> ascertain whether or not it meets our use cases, and also what best
> practices we should be following. I have a few questions I would
> appreciate answers to.
> Our scenario is that we want to process a lot of event data into
> cases. A case is an inorder sequence of events; this event data could
> be quite old. We never know when a case is complete, so we just want
> to have the most up to date picture of what a case looks like.
> The inorder sequence of events of a case is called the trace. Many
> cases could have an identical trace. We would like to construct these
> traces, and do some aggregations on those (case count, average/min/max
> life-cycle time).
> We then have further downstream processing we will do on a case, some
> of which would require additional inputs, either from side-inputs of
> somehow joining data sources.
> We don’t really care about event time at the moment, because we just
> want to build cases and traces with all the data we have received.
> The end results should be available for our web front end via rest api.
> Based on the above I have the following idea for a first implementation:
> Kafka source -> key by case id -> session window with rocks db state
> backend holding case for that key -> postgres sink
> The reason for a session window is that, as I mentioned above, we just
> want to build a group with all the data we have received into kafka up
> until that point in time. We would experiment with what this gap time
> should be, and in future it might be specific to the type of group,
> but for the start a naive approach is acceptable. I think this could
> be better than just doing it, say, every 10 minutes because we really
> don’t know yet the frequency of the data received. Also, some inputs
> to kafka come directly from a CSV upload, so we will get “firehose”
> periods, and periods of nothing.
> In short: I think what we have closely matches session behaviour.
> We also have to implement a postgres sink that is capable of doing
> upserts. The reason for postgres is to service the rest front end.
> We then have to build our traces and can see two options for it:
> 1) The most obvious solution would be to use a kafka sink for the
> keyed case stream, and to do the trace aggregations in a downstream
> flink job with this kafka topic as a source. However, I have some
> concerns over losing any data (i.e. how do we know whether or not an
> event has been successfully pushed into the kafka stream).
> 2) Another approach might be to use some other type of sink (perhaps
> postgres), and to use this as a source for the traces job. This would
> help us guarantee data consistency.
> 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?),
> so:
> Keyed cases stream -> broadcast -> key by tracehash with rocks db
> state backend holding trace for that tracehash -> perform
> aggregrations -> postgres sink
> Is broadcast an option here? How costly is it?
> Which of these approaches (or any other), would you recommend?
> -------------------------------------
> Another question regarding the state:
> As we never know when a case is complete this means that the rocksdb
> backend could grow infinitely (!). Obviously we would need to get a
> bit smarter here.
> Is using state in this way a somewhat standard practice, or is state
> intended more for recovery?
> Managing growing state: I found some discussion regarding how to clear
> state here http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#
> a10402
> which references https://issues.apache.org/jira/browse/FLINK-3946
> Thanks,
> Kat

View raw message