flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oytun Tez <oy...@motaword.com>
Subject Re: FlinkCEP questions - architecture
Date Fri, 21 Feb 2020 16:08:13 GMT
Amazing content, thanks for asking and answering.

On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag <
juergen.donnerstag@gmail.com> wrote:

> thanks a lot
> Juergen
>
> On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <kkloudas@gmail.com>
> wrote:
>
>> Hi Juergen,
>>
>> I will reply to your questions inline. As a general comment I would
>> suggest to also have a look at [3] so that you have an idea of some of
>> the alternatives.
>> With that said, here come the answers :)
>>
>> 1) We receive files every day, which are exports from some database
>> tables, containing ONLY changes from the day. Most tables have
>> modify-cols. Even though they are files but because they contain
>> changes only, I belief the file records shall be considered events in
>> Flink terminology. Is that assumption correct?
>>
>> -> Yes. I think your assumption is correct.
>>
>> 2) The records within the DB export files are NOT in chronologically,
>> and we can not change the export. Our use case is a "complex event
>> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
>> A, then B, then C within 30 days, then do something". Does that work
>> with FlinkCEP despite the events/records are not in chrono order
>> within the file? The files are 100MB to 20GB in size. Do I need to
>> sort the files first before CEP processing?
>>
>> -> Flink CEP also works in event time and the re-ordering can be done by
>> Flink
>>
>> 3) Occassionally some crazy people manually "correct" DB records
>> within the database and manually trigger a re-export of ALL of the
>> changes for that respective day (e.g. last weeks Tuesday).
>> Consequently we receive a correction file. Same filename but "_1"
>> appended. All filenames include the date (of the original export).
>> What are the options to handle that case (besides telling the DB
>> admins not to, which we did already). Regular checkpoints and
>> re-process all files since then?  What happens to the CEP state? Will
>> it be checkpointed as well?
>>
>> -> If you require re-processing, then I would say that your best
>> option is what you described. The other option would be to keep
>> everything in Flink state until you are sure that no more corrections
>> will come. In this case, you have to somehow issue the "correction" in
>> a way that the downstream system can understand what to correct and
>> how. Keep in mind that this may be an expensive operation because
>> everything has to be kept in state for longer.
>>
>> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>>
>> -> The only thing to consider is the size of your state. Time is not
>> necessarily an issue. If your state for these 180 days is a couple of
>> MBs, then you have no problem. If it increases fast, then you have to
>> provision your cluster accordingly.
>>
>> 5) We also have CEP rules that must fire if after a start sequence
>> matched, the remaining sequence did NOT within a configured window.
>> E.g. If A, then B, but C did not occur within 30 days since A. Is that
>> supported by FlinkCEP? I couldn't find a working example.
>>
>> -> You can have a look at [1] for the supported pattern combinations
>> and you can also look at [2] for some tests of different pattern
>> combinations.
>>
>> 6) We expect 30-40 CEP rules. How can we estimate the required storage
>> size for the temporary CEP state? Is there some sort of formular
>> considering number of rules, number of records per file or day, record
>> size, window, number of records matched per sequence, number of keyBy
>> grouping keys, ...
>>
>> -> In FlinkCEP, each pattern becomes a single operator. This means
>> that you will have 30-40 operators in your job graph, each with each
>> own state. This can become heavy but once again it depends on your
>> workload. I cannot give an estimate because in CEP, in order to
>> guarantee correct ordering of events in an unordered stream, the
>> library sometimes has to keep also in state more records than will be
>> presented at the end.
>>
>> Have you considered going with a solution based on processfunction and
>> broadcast state? This will also allow you to have a more dynamic
>> set-up where patterns can be added at runtime and it will allow you to
>> do any optimizations specific to your workload ;) For a discussion on
>> this, check [3]. In addition, it will allow you to "multiplex" many
>> patterns into a single operator thus potentially minimizing the amount
>> of copies of the state you keep.
>>
>> 7) I can imagine that for debugging reasons it'd be good if we were
>> able to query the temporary CEP state. What is the (CEP) schema used
>> to persist the CEP state and how can we query it? And does such query
>> work on the whole cluster or only per node (e.g. because of shuffle
>> and nodes responsible only for a portion of the events).
>>
>> -> Unfortunatelly the state in CEP is not queryable, thus I am not
>> sure if you can inspect it at runtime.
>>
>> 8) I understand state is stored per node. What happens if I want to
>> add or remove a nodes. Will the state still be found, despite it being
>> stored in another node? I read that I need to be equally careful when
>> changing rules? Or is that a different issue?
>>
>> -> Rescaling a Flink job is not done automatically. You need to take a
>> savepoint and then relaunch your job with a different parallelism.
>> Updating a rule is not supported in CEP, as changing a rule would
>> imply that (potentially) the state should change. But what you could
>> do is take a savepoint, remove the old pattern and add a new one (the
>> updated one) and tell Flink to ignore the state of the previous
>> operator (as said earlier each CEP pattern is translated to an
>> operator).
>>
>> 9) How does garbage collection of temp CEP state work, or will it stay
>> forever?  For tracing/investigation reasons I can imagine that purging
>> it at the earliest possible time is not always the best option. May be
>> after 30 days later or so.
>>
>> -> CEP clean state after the time horizon (specified with the
>> .within() clause) expires.
>>
>> 10) Are there strategies to minimize temp CEP state? In SQL queries
>> you  filter first on the "smallest" attributes. CEP rules form a
>> sequence. Hence that approach will not work. Is that an issue at all?
>> What are practical limits on the CEP temp state storage engine?
>>
>> -> Such optimizations are not supported out of the box. I would
>> recommend to go with the Broadcast state approach in [3].
>>
>> 11) Occassionally we need to process about 200 files at once. Can I
>> speed things up by processing all files in parallel on multiple nodes,
>> despite their sequence (CEP use case)? This would only work if
>> FlinkCEP in step 1 simply filters on all relevant events of a
>> sequence, updates state, and in a step 2 - after the files are
>> processed - evaluates the updated state if that meets the sequences.
>>
>> 12) Schema changes in the input files: Occassionly the DB source
>> system schema is changed, and not always in a backwards compatible way
>> (insert new fields in the middle), and also the export will have the
>> field in the middle. This means that starting from a specific (file)
>> date, I need to consider a different schema. This must also be handled
>> when re-running files for the last month, because of corrections
>> provided. And if the file format has changed someone in the middle ...
>>
>> -> This seems to be relevant for the "data cleaning" phase, before you
>> send your data to CEP. In this case, if the schema changes, then I
>> assume that you need to update your initial parsing logic, which means
>> taking a savepoint and redeploying the updated jobGraph with the new
>> input parsing logic (if I understand correctly).
>>
>> thanks a lot for your time and your help
>>
>> I hope that above helps!
>>
>> Cheers,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
>> [2]
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
>> [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
>>
>> On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
>> <juergen.donnerstag@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > we're in very early stages evaluating options. I'm not a Flink expert,
>> but did read some of the docs and watched videos. Could you please help me
>> understand if and how certain of our reqs are covered by Flink (CEP). Is
>> this mailing list the right channel for such questions?
>> >
>> > 1) We receive files every day, which are exports from some database
>> tables, containing ONLY changes from the day. Most tables have modify-cols.
>> Even though they are files but because they contain changes only, I belief
>> the file records shall be considered events in Flink terminology. Is that
>> assumption correct?
>> >
>> > 2) The records within the DB export files are NOT in chronologically,
>> and we can not change the export. Our use case is a "complex event
>> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A,
>> then B, then C within 30 days, then do something". Does that work with
>> FlinkCEP despite the events/records are not in chrono order within the
>> file? The files are 100MB to 20GB in size. Do I need to sort the files
>> first before CEP processing?
>> >
>> > 3) Occassionally some crazy people manually "correct" DB records within
>> the database and manually trigger a re-export of ALL of the changes for
>> that respective day (e.g. last weeks Tuesday). Consequently we receive a
>> correction file. Same filename but "_1" appended. All filenames include the
>> date (of the original export). What are the options to handle that case
>> (besides telling the DB admins not to, which we did already). Regular
>> checkpoints and re-process all files since then?  What happens to the CEP
>> state? Will it be checkpointed as well?
>> >
>> > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>> >
>> > 5) We also have CEP rules that must fire if after a start sequence
>> matched, the remaining sequence did NOT within a configured window. E.g. If
>> A, then B, but C did not occur within 30 days since A. Is that supported by
>> FlinkCEP? I couldn't find a working example.
>> >
>> > 6) We expect 30-40 CEP rules. How can we estimate the required storage
>> size for the temporary CEP state? Is there some sort of formular
>> considering number of rules, number of records per file or day, record
>> size, window, number of records matched per sequence, number of keyBy
>> grouping keys, ...
>> >
>> > 7) I can imagine that for debugging reasons it'd be good if we were
>> able to query the temporary CEP state. What is the (CEP) schema used to
>> persist the CEP state and how can we query it? And does such query work on
>> the whole cluster or only per node (e.g. because of shuffle and nodes
>> responsible only for a portion of the events).
>> >
>> > 8) I understand state is stored per node. What happens if I want to add
>> or remove a nodes. Will the state still be found, despite it being stored
>> in another node? I read that I need to be equally careful when changing
>> rules? Or is that a different issue?
>> >
>> > 9) How does garbage collection of temp CEP state work, or will it stay
>> forever?  For tracing/investigation reasons I can imagine that purging it
>> at the earliest possible time is not always the best option. May be after
>> 30 days later or so.
>> >
>> > 10) Are there strategies to minimize temp CEP state? In SQL queries
>> you  filter first on the "smallest" attributes. CEP rules form a sequence.
>> Hence that approach will not work. Is that an issue at all? What are
>> practical limits on the CEP temp state storage engine?
>> >
>> > 11) Occassionally we need to process about 200 files at once. Can I
>> speed things up by processing all files in parallel on multiple nodes,
>> despite their sequence (CEP use case)? This would only work if FlinkCEP in
>> step 1 simply filters on all relevant events of a sequence, updates state,
>> and in a step 2 - after the files are processed - evaluates the updated
>> state if that meets the sequences.
>> >
>> > 12) Schema changes in the input files: Occassionly the DB source system
>> schema is changed, and not always in a backwards compatible way (insert new
>> fields in the middle), and also the export will have the field in the
>> middle. This means that starting from a specific (file) date, I need to
>> consider a different schema. This must also be handled when re-running
>> files for the last month, because of corrections provided. And if the file
>> format has changed someone in the middle ...
>> >
>> > thanks a lot for your time and your help
>> > Juergen
>>
> --
 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oytun@motaword.com

      <https://www.motaword.com/blog>

Mime
View raw message