flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Strategies for Complex Event Processing with guaranteed data consistency
Date Wed, 18 Jan 2017 10:49:22 GMT
Hi Kat,

thanks for the clarification about cases and traces.

Regarding the aggregation of traces: You can either do that in the same job
that constructs the cases or in a job which is decoupled by for instance
Kafka.
If I got your requirements right, you need a mechanism for retraction.
A case (id: 1, event-1, event-3) would result in a trace (event-1, event-3)
and go into the corresponding aggregates (e.g., increment a count by 1).
If the case with id: 1 receives another event (say event-2), it would
change its trace to (event-1, event-3, event-2) such that the counter of
(event-1, event-3) needs to be decreased and the counter for the new trace
(1, 3, 2) be increased.

You can solve this by keeping the latest version of each case in a
keyed-state (keyBy(case-id).flatMap()). Whenever, an update is received,
you have to send out a retraction record for the old trace and an update
record for the new trace. The aggregation would again be done on a keyed
state (keyBy(trace-hash).map()).
You might want to write the result into some kind of datastore with a
primary key (trace-hash) to be able to update the results in place (updates
cause by retraction and update trace records). This could be a relational
DB (Postres) or a compacted Kafka topic.

Hope this helps.

Best, Fabian


2017-01-16 9:49 GMT+01:00 Kathleen Sharp <kathleen.sharp@signavio.com>:

> Hi Fabian,
>
> A case consists of all events sharing the same case id. This id is
> what we initially key the stream by.
>
> The order of these events is the trace.
>
> For example,
> caseid: case1, consisting of event1, event2, event3. Start time 11:00,
> end 11:05, run time 5 minutes
> caseid: case12, consisting of event1, event2, event3 Start time 11:00,
> end 11:15, run time 15 minutes
>
> These are 2 distinct cases, with the same trace (event1, event2,
> event3). This trace would have 2 occurrences with a min run time of 5
> minutes, max 15 and average 10.
>
> I have implemented your 2nd suggestion for the first job, I hope I
> have made the Traces clearer as I am still unsure of the best approach
> here.
>
> Thanks a lot,
> Kat
>
> On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske <fhueske@gmail.com> wrote:
> > On thing to add: the Flink KafkaProducer provides only at-least-once if
> > flush-on-checkpoint is enabled [1].
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/
> FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean-
> >
> > 2017-01-13 22:02 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
> >>
> >> Hi Kat,
> >>
> >> I did not understand the difference between a case and a trace.
> >> If I got it right, the goal of your first job is to assemble the
> >> individual events into cases. Is a case here the last event for a
> case-id or
> >> all events of a case-id?
> >> If a case is the collection of all events (which I assume) what is the
> >> difference to a trace which is also the list of events (if I got it
> right)?
> >>
> >> In any case, I think your first job can also be solved without a session
> >> window (which is quite complex internally).
> >> There are two options:
> >> 1) use a global window [1] with a custom trigger that triggers for each
> >> arriving record. A global window does never end, which would be OK since
> >> your cases do not end as well.
> >> 2) use a MapFunction with key-partitioned operator state [2]. The map
> >> function would simply update the state for every new event and emit a
> new
> >> result.
> >>
> >> Regarding your concerns of losing data when writing to Kafka. Flink's
> >> KafkaProducer provides at-least-once guarantees, which means that data
> might
> >> be written more than once in case of a failure but won't be lost. If the
> >> Kafka topic is partitioned by case-id and you only need the last record
> per
> >> case-id, Kafka's log compaction should give you upsert semantics.
> >>
> >> Regarding your question "Is using state in this way a somewhat standard
> >> practice, or is state intended more for recovery?":
> >> Many streaming applications require state for their semantics (just like
> >> yours), i.e., they need to buffer data and wait for more data to
> arrive. In
> >> order to guarantee consistent result semantics of an application, the
> state
> >> must not be lost and be recovered in case of a failure. So state is not
> >> intended for recovery, but recovery is needed to guarantee application
> >> semantics.
> >>
> >> As I said before, I did not get the difference between cases and trace,
> so
> >> I cannot really comment on the job to analyze traces.
> >>
> >> Hope this helps,
> >> Fabian
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/windows.html#global-windows
> >> [2]
> >> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
> >>
> >> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <kathleen.sharp@signavio.com
> >:
> >>>
> >>> I have been playing around with Flink for a few weeks to try to
> >>> ascertain whether or not it meets our use cases, and also what best
> >>> practices we should be following. I have a few questions I would
> >>> appreciate answers to.
> >>>
> >>>
> >>> Our scenario is that we want to process a lot of event data into
> >>> cases. A case is an inorder sequence of events; this event data could
> >>> be quite old. We never know when a case is complete, so we just want
> >>> to have the most up to date picture of what a case looks like.
> >>>
> >>>
> >>> The inorder sequence of events of a case is called the trace. Many
> >>> cases could have an identical trace. We would like to construct these
> >>> traces, and do some aggregations on those (case count, average/min/max
> >>> life-cycle time).
> >>>
> >>>
> >>> We then have further downstream processing we will do on a case, some
> >>> of which would require additional inputs, either from side-inputs of
> >>> somehow joining data sources.
> >>>
> >>>
> >>> We don’t really care about event time at the moment, because we just
> >>> want to build cases and traces with all the data we have received.
> >>>
> >>>
> >>> The end results should be available for our web front end via rest api.
> >>>
> >>>
> >>> Based on the above I have the following idea for a first
> implementation:
> >>>
> >>>
> >>> Kafka source -> key by case id -> session window with rocks db state
> >>> backend holding case for that key -> postgres sink
> >>>
> >>>
> >>> The reason for a session window is that, as I mentioned above, we just
> >>> want to build a group with all the data we have received into kafka up
> >>> until that point in time. We would experiment with what this gap time
> >>> should be, and in future it might be specific to the type of group,
> >>> but for the start a naive approach is acceptable. I think this could
> >>> be better than just doing it, say, every 10 minutes because we really
> >>> don’t know yet the frequency of the data received. Also, some inputs
> >>> to kafka come directly from a CSV upload, so we will get “firehose”
> >>> periods, and periods of nothing.
> >>>
> >>> In short: I think what we have closely matches session behaviour.
> >>>
> >>>
> >>> We also have to implement a postgres sink that is capable of doing
> >>> upserts. The reason for postgres is to service the rest front end.
> >>>
> >>>
> >>> We then have to build our traces and can see two options for it:
> >>>
> >>>
> >>> 1) The most obvious solution would be to use a kafka sink for the
> >>> keyed case stream, and to do the trace aggregations in a downstream
> >>> flink job with this kafka topic as a source. However, I have some
> >>> concerns over losing any data (i.e. how do we know whether or not an
> >>> event has been successfully pushed into the kafka stream).
> >>>
> >>>
> >>> 2) Another approach might be to use some other type of sink (perhaps
> >>> postgres), and to use this as a source for the traces job. This would
> >>> help us guarantee data consistency.
> >>>
> >>>
> >>> 3) Or, to somehow re-merge the keyed cases stream (is this a
> broadcast?),
> >>> so:
> >>>
> >>> Keyed cases stream -> broadcast -> key by tracehash with rocks db
> >>> state backend holding trace for that tracehash -> perform
> >>> aggregrations -> postgres sink
> >>>
> >>> Is broadcast an option here? How costly is it?
> >>>
> >>>
> >>> Which of these approaches (or any other), would you recommend?
> >>>
> >>>
> >>> -------------------------------------
> >>>
> >>> Another question regarding the state:
> >>>
> >>> As we never know when a case is complete this means that the rocksdb
> >>> backend could grow infinitely (!). Obviously we would need to get a
> >>> bit smarter here.
> >>>
> >>>
> >>> Is using state in this way a somewhat standard practice, or is state
> >>> intended more for recovery?
> >>>
> >>> Managing growing state: I found some discussion regarding how to clear
> >>> state here
> >>> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#
> a10402
> >>> which references https://issues.apache.org/jira/browse/FLINK-3946
> >>>
> >>> Thanks,
> >>>
> >>> Kat
> >>
> >>
> >
>
>
>
> --
> _______________________________________________________
>
> Besuchen Sie uns / Meet us:
> OPEX-Woche DACH: January 17-19, Wiesbaden, Germany
> OPEX Week World Summit: January 23-27, Orlando, USA
> CIO EDGE Experience: Febuary 22-23, Melbourne, Australia
> Business Transformation and Operational Excellence World Summit: March
> 21-24, Orlando, USA
>
> Folgen Sie uns / Follow us:
> Twitter | Facebook | LinkedIn | Xing | Youtube
> ________________________________________________________
>
>
> HRB 121584 B Amtsgericht Charlottenburg, Ust-ID: DE265675123
> Geschäftsführer: Dr. Gero Decker, Guido Sachs
>

Mime
View raw message