flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juergen Donnerstag <juergen.donners...@gmail.com>
Subject Re: FlinkCEP questions - architecture
Date Fri, 21 Feb 2020 10:04:34 GMT
thanks a lot
Juergen

On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <kkloudas@gmail.com> wrote:

> Hi Juergen,
>
> I will reply to your questions inline. As a general comment I would
> suggest to also have a look at [3] so that you have an idea of some of
> the alternatives.
> With that said, here come the answers :)
>
> 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have
> modify-cols. Even though they are files but because they contain
> changes only, I belief the file records shall be considered events in
> Flink terminology. Is that assumption correct?
>
> -> Yes. I think your assumption is correct.
>
> 2) The records within the DB export files are NOT in chronologically,
> and we can not change the export. Our use case is a "complex event
> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
> A, then B, then C within 30 days, then do something". Does that work
> with FlinkCEP despite the events/records are not in chrono order
> within the file? The files are 100MB to 20GB in size. Do I need to
> sort the files first before CEP processing?
>
> -> Flink CEP also works in event time and the re-ordering can be done by
> Flink
>
> 3) Occassionally some crazy people manually "correct" DB records
> within the database and manually trigger a re-export of ALL of the
> changes for that respective day (e.g. last weeks Tuesday).
> Consequently we receive a correction file. Same filename but "_1"
> appended. All filenames include the date (of the original export).
> What are the options to handle that case (besides telling the DB
> admins not to, which we did already). Regular checkpoints and
> re-process all files since then?  What happens to the CEP state? Will
> it be checkpointed as well?
>
> -> If you require re-processing, then I would say that your best
> option is what you described. The other option would be to keep
> everything in Flink state until you are sure that no more corrections
> will come. In this case, you have to somehow issue the "correction" in
> a way that the downstream system can understand what to correct and
> how. Keep in mind that this may be an expensive operation because
> everything has to be kept in state for longer.
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> -> The only thing to consider is the size of your state. Time is not
> necessarily an issue. If your state for these 180 days is a couple of
> MBs, then you have no problem. If it increases fast, then you have to
> provision your cluster accordingly.
>
> 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window.
> E.g. If A, then B, but C did not occur within 30 days since A. Is that
> supported by FlinkCEP? I couldn't find a working example.
>
> -> You can have a look at [1] for the supported pattern combinations
> and you can also look at [2] for some tests of different pattern
> combinations.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
>
> -> In FlinkCEP, each pattern becomes a single operator. This means
> that you will have 30-40 operators in your job graph, each with each
> own state. This can become heavy but once again it depends on your
> workload. I cannot give an estimate because in CEP, in order to
> guarantee correct ordering of events in an unordered stream, the
> library sometimes has to keep also in state more records than will be
> presented at the end.
>
> Have you considered going with a solution based on processfunction and
> broadcast state? This will also allow you to have a more dynamic
> set-up where patterns can be added at runtime and it will allow you to
> do any optimizations specific to your workload ;) For a discussion on
> this, check [3]. In addition, it will allow you to "multiplex" many
> patterns into a single operator thus potentially minimizing the amount
> of copies of the state you keep.
>
> 7) I can imagine that for debugging reasons it'd be good if we were
> able to query the temporary CEP state. What is the (CEP) schema used
> to persist the CEP state and how can we query it? And does such query
> work on the whole cluster or only per node (e.g. because of shuffle
> and nodes responsible only for a portion of the events).
>
> -> Unfortunatelly the state in CEP is not queryable, thus I am not
> sure if you can inspect it at runtime.
>
> 8) I understand state is stored per node. What happens if I want to
> add or remove a nodes. Will the state still be found, despite it being
> stored in another node? I read that I need to be equally careful when
> changing rules? Or is that a different issue?
>
> -> Rescaling a Flink job is not done automatically. You need to take a
> savepoint and then relaunch your job with a different parallelism.
> Updating a rule is not supported in CEP, as changing a rule would
> imply that (potentially) the state should change. But what you could
> do is take a savepoint, remove the old pattern and add a new one (the
> updated one) and tell Flink to ignore the state of the previous
> operator (as said earlier each CEP pattern is translated to an
> operator).
>
> 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging
> it at the earliest possible time is not always the best option. May be
> after 30 days later or so.
>
> -> CEP clean state after the time horizon (specified with the
> .within() clause) expires.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries
> you  filter first on the "smallest" attributes. CEP rules form a
> sequence. Hence that approach will not work. Is that an issue at all?
> What are practical limits on the CEP temp state storage engine?
>
> -> Such optimizations are not supported out of the box. I would
> recommend to go with the Broadcast state approach in [3].
>
> 11) Occassionally we need to process about 200 files at once. Can I
> speed things up by processing all files in parallel on multiple nodes,
> despite their sequence (CEP use case)? This would only work if
> FlinkCEP in step 1 simply filters on all relevant events of a
> sequence, updates state, and in a step 2 - after the files are
> processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source
> system schema is changed, and not always in a backwards compatible way
> (insert new fields in the middle), and also the export will have the
> field in the middle. This means that starting from a specific (file)
> date, I need to consider a different schema. This must also be handled
> when re-running files for the last month, because of corrections
> provided. And if the file format has changed someone in the middle ...
>
> -> This seems to be relevant for the "data cleaning" phase, before you
> send your data to CEP. In this case, if the schema changes, then I
> assume that you need to update your initial parsing logic, which means
> taking a savepoint and redeploying the updated jobGraph with the new
> input parsing logic (if I understand correctly).
>
> thanks a lot for your time and your help
>
> I hope that above helps!
>
> Cheers,
> Kostas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
> [2]
> https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
> [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
>
> On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
> <juergen.donnerstag@gmail.com> wrote:
> >
> > Hi,
> >
> > we're in very early stages evaluating options. I'm not a Flink expert,
> but did read some of the docs and watched videos. Could you please help me
> understand if and how certain of our reqs are covered by Flink (CEP). Is
> this mailing list the right channel for such questions?
> >
> > 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have modify-cols.
> Even though they are files but because they contain changes only, I belief
> the file records shall be considered events in Flink terminology. Is that
> assumption correct?
> >
> > 2) The records within the DB export files are NOT in chronologically,
> and we can not change the export. Our use case is a "complex event
> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A,
> then B, then C within 30 days, then do something". Does that work with
> FlinkCEP despite the events/records are not in chrono order within the
> file? The files are 100MB to 20GB in size. Do I need to sort the files
> first before CEP processing?
> >
> > 3) Occassionally some crazy people manually "correct" DB records within
> the database and manually trigger a re-export of ALL of the changes for
> that respective day (e.g. last weeks Tuesday). Consequently we receive a
> correction file. Same filename but "_1" appended. All filenames include the
> date (of the original export). What are the options to handle that case
> (besides telling the DB admins not to, which we did already). Regular
> checkpoints and re-process all files since then?  What happens to the CEP
> state? Will it be checkpointed as well?
> >
> > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
> >
> > 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window. E.g. If
> A, then B, but C did not occur within 30 days since A. Is that supported by
> FlinkCEP? I couldn't find a working example.
> >
> > 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
> >
> > 7) I can imagine that for debugging reasons it'd be good if we were able
> to query the temporary CEP state. What is the (CEP) schema used to persist
> the CEP state and how can we query it? And does such query work on the
> whole cluster or only per node (e.g. because of shuffle and nodes
> responsible only for a portion of the events).
> >
> > 8) I understand state is stored per node. What happens if I want to add
> or remove a nodes. Will the state still be found, despite it being stored
> in another node? I read that I need to be equally careful when changing
> rules? Or is that a different issue?
> >
> > 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging it
> at the earliest possible time is not always the best option. May be after
> 30 days later or so.
> >
> > 10) Are there strategies to minimize temp CEP state? In SQL queries you
> filter first on the "smallest" attributes. CEP rules form a sequence. Hence
> that approach will not work. Is that an issue at all? What are practical
> limits on the CEP temp state storage engine?
> >
> > 11) Occassionally we need to process about 200 files at once. Can I
> speed things up by processing all files in parallel on multiple nodes,
> despite their sequence (CEP use case)? This would only work if FlinkCEP in
> step 1 simply filters on all relevant events of a sequence, updates state,
> and in a step 2 - after the files are processed - evaluates the updated
> state if that meets the sequences.
> >
> > 12) Schema changes in the input files: Occassionly the DB source system
> schema is changed, and not always in a backwards compatible way (insert new
> fields in the middle), and also the export will have the field in the
> middle. This means that starting from a specific (file) date, I need to
> consider a different schema. This must also be handled when re-running
> files for the last month, because of corrections provided. And if the file
> format has changed someone in the middle ...
> >
> > thanks a lot for your time and your help
> > Juergen
>

Mime
View raw message