From user-return-32801-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Feb 21 10:04:55 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 779B3180657 for ; Fri, 21 Feb 2020 11:04:54 +0100 (CET) Received: (qmail 21532 invoked by uid 500); 21 Feb 2020 10:04:53 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 21522 invoked by uid 99); 21 Feb 2020 10:04:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Feb 2020 10:04:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7944C1813E8 for ; Fri, 21 Feb 2020 10:04:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.001 X-Spam-Level: X-Spam-Status: No, score=-0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id BMwvb2Nm8jlC for ; Fri, 21 Feb 2020 10:04:50 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.217.68; helo=mail-vs1-f68.google.com; envelope-from=juergen.donnerstag@gmail.com; receiver= Received: from mail-vs1-f68.google.com (mail-vs1-f68.google.com [209.85.217.68]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 0CE5EBB802 for ; Fri, 21 Feb 2020 10:04:50 +0000 (UTC) Received: by mail-vs1-f68.google.com with SMTP id p14so856863vsq.6 for ; Fri, 21 Feb 2020 02:04:49 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=Jhx1s/awIZIG9h6RrIoKlHZVgGMgY3I81IzxK6L1Yuk=; b=RUGGdhaHiAcT30N/iOZq2dBvaxrf7RIaT+9sawidVtkBtVuAGCR/qY2jPlN9Aj18bY RlHjQ82/kYVRvBXk2PlNhyZB/Mx16Vag/hmBhGvXJbYPSuClpf9r6eoVoP1NoYxvkUEH ClNUlrOnGUEAe2fmHL+WR+UJ3ZZbhUmPAO032C9gnYWBlSaCRWIFzz/rOpX1Jh4+2VLJ Ok9vVdk6sLMhemAhl8Qy9ycmdjkvumFAPijQTM+F3Qzu2A8m1ioyb8sXNCUP8gAMAAxJ WomIJFefA7iLr4VZMibbHpVzSIRMYiuzN9ogiAY/KIZD0lAHg7lxAmY1pPMWXpx9p/ea Kt9A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=Jhx1s/awIZIG9h6RrIoKlHZVgGMgY3I81IzxK6L1Yuk=; b=axBRw7uvXwV/lG+4u8C6rOYzTiKNBFVabYWtzlbZ8H/pg5t7WjP1PqNlzee87S5w5a mLEiBaAZmulM2MJN1apPE62wnO1GVHOj3TuRiTk71hHpD9vPQ6H8eu3Ua2Pac1e/qFXr FnLEnlj8o6jf3lWP/wvx1UpSUGgtzgNA5YjdAJ+ssuuPMCKJ7zOu+1PN30o9X8PIrj7Y OZf65F/CnWUY+8Qbrm7n6FrMZFdqnDFCqsgj9afBybEQLGjJ9JlvUwLmqLVfSzGhogzJ 1Yrc/8OMVr2iHrfojFTIZ1yDn9H3UZEmAooUqyB0gj4YK5ZuYpHFohtIeMetq37lqj45 TUhw== X-Gm-Message-State: APjAAAV6Pa151xWfwvti1hv0VITl8fXz+Q2XiXJbE/ymxn2WCb7Rgayb ewY7mxPHeGWIFVSCNV61DWADB9CKEDUeYQ/SFHk= X-Google-Smtp-Source: APXvYqwGYYJ1BjTUW96DGuCdqLW54/tUC1EGrA13Kj/PEC02xHbG37ydB81b7p+5L1E5k18g7OgI2KAklG6lxl5oLjo= X-Received: by 2002:a67:ea87:: with SMTP id f7mr19511124vso.52.1582279489491; Fri, 21 Feb 2020 02:04:49 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Juergen Donnerstag Date: Fri, 21 Feb 2020 11:04:34 +0100 Message-ID: Subject: Re: FlinkCEP questions - architecture To: Kostas Kloudas Cc: user Content-Type: multipart/alternative; boundary="000000000000e82470059f132615" --000000000000e82470059f132615 Content-Type: text/plain; charset="UTF-8" thanks a lot Juergen On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas wrote: > Hi Juergen, > > I will reply to your questions inline. As a general comment I would > suggest to also have a look at [3] so that you have an idea of some of > the alternatives. > With that said, here come the answers :) > > 1) We receive files every day, which are exports from some database > tables, containing ONLY changes from the day. Most tables have > modify-cols. Even though they are files but because they contain > changes only, I belief the file records shall be considered events in > Flink terminology. Is that assumption correct? > > -> Yes. I think your assumption is correct. > > 2) The records within the DB export files are NOT in chronologically, > and we can not change the export. Our use case is a "complex event > processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first > A, then B, then C within 30 days, then do something". Does that work > with FlinkCEP despite the events/records are not in chrono order > within the file? The files are 100MB to 20GB in size. Do I need to > sort the files first before CEP processing? > > -> Flink CEP also works in event time and the re-ordering can be done by > Flink > > 3) Occassionally some crazy people manually "correct" DB records > within the database and manually trigger a re-export of ALL of the > changes for that respective day (e.g. last weeks Tuesday). > Consequently we receive a correction file. Same filename but "_1" > appended. All filenames include the date (of the original export). > What are the options to handle that case (besides telling the DB > admins not to, which we did already). Regular checkpoints and > re-process all files since then? What happens to the CEP state? Will > it be checkpointed as well? > > -> If you require re-processing, then I would say that your best > option is what you described. The other option would be to keep > everything in Flink state until you are sure that no more corrections > will come. In this case, you have to somehow issue the "correction" in > a way that the downstream system can understand what to correct and > how. Keep in mind that this may be an expensive operation because > everything has to be kept in state for longer. > > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? > > -> The only thing to consider is the size of your state. Time is not > necessarily an issue. If your state for these 180 days is a couple of > MBs, then you have no problem. If it increases fast, then you have to > provision your cluster accordingly. > > 5) We also have CEP rules that must fire if after a start sequence > matched, the remaining sequence did NOT within a configured window. > E.g. If A, then B, but C did not occur within 30 days since A. Is that > supported by FlinkCEP? I couldn't find a working example. > > -> You can have a look at [1] for the supported pattern combinations > and you can also look at [2] for some tests of different pattern > combinations. > > 6) We expect 30-40 CEP rules. How can we estimate the required storage > size for the temporary CEP state? Is there some sort of formular > considering number of rules, number of records per file or day, record > size, window, number of records matched per sequence, number of keyBy > grouping keys, ... > > -> In FlinkCEP, each pattern becomes a single operator. This means > that you will have 30-40 operators in your job graph, each with each > own state. This can become heavy but once again it depends on your > workload. I cannot give an estimate because in CEP, in order to > guarantee correct ordering of events in an unordered stream, the > library sometimes has to keep also in state more records than will be > presented at the end. > > Have you considered going with a solution based on processfunction and > broadcast state? This will also allow you to have a more dynamic > set-up where patterns can be added at runtime and it will allow you to > do any optimizations specific to your workload ;) For a discussion on > this, check [3]. In addition, it will allow you to "multiplex" many > patterns into a single operator thus potentially minimizing the amount > of copies of the state you keep. > > 7) I can imagine that for debugging reasons it'd be good if we were > able to query the temporary CEP state. What is the (CEP) schema used > to persist the CEP state and how can we query it? And does such query > work on the whole cluster or only per node (e.g. because of shuffle > and nodes responsible only for a portion of the events). > > -> Unfortunatelly the state in CEP is not queryable, thus I am not > sure if you can inspect it at runtime. > > 8) I understand state is stored per node. What happens if I want to > add or remove a nodes. Will the state still be found, despite it being > stored in another node? I read that I need to be equally careful when > changing rules? Or is that a different issue? > > -> Rescaling a Flink job is not done automatically. You need to take a > savepoint and then relaunch your job with a different parallelism. > Updating a rule is not supported in CEP, as changing a rule would > imply that (potentially) the state should change. But what you could > do is take a savepoint, remove the old pattern and add a new one (the > updated one) and tell Flink to ignore the state of the previous > operator (as said earlier each CEP pattern is translated to an > operator). > > 9) How does garbage collection of temp CEP state work, or will it stay > forever? For tracing/investigation reasons I can imagine that purging > it at the earliest possible time is not always the best option. May be > after 30 days later or so. > > -> CEP clean state after the time horizon (specified with the > .within() clause) expires. > > 10) Are there strategies to minimize temp CEP state? In SQL queries > you filter first on the "smallest" attributes. CEP rules form a > sequence. Hence that approach will not work. Is that an issue at all? > What are practical limits on the CEP temp state storage engine? > > -> Such optimizations are not supported out of the box. I would > recommend to go with the Broadcast state approach in [3]. > > 11) Occassionally we need to process about 200 files at once. Can I > speed things up by processing all files in parallel on multiple nodes, > despite their sequence (CEP use case)? This would only work if > FlinkCEP in step 1 simply filters on all relevant events of a > sequence, updates state, and in a step 2 - after the files are > processed - evaluates the updated state if that meets the sequences. > > 12) Schema changes in the input files: Occassionly the DB source > system schema is changed, and not always in a backwards compatible way > (insert new fields in the middle), and also the export will have the > field in the middle. This means that starting from a specific (file) > date, I need to consider a different schema. This must also be handled > when re-running files for the last month, because of corrections > provided. And if the file format has changed someone in the middle ... > > -> This seems to be relevant for the "data cleaning" phase, before you > send your data to CEP. In this case, if the schema changes, then I > assume that you need to update your initial parsing logic, which means > taking a savepoint and redeploying the updated jobGraph with the new > input parsing logic (if I understand correctly). > > thanks a lot for your time and your help > > I hope that above helps! > > Cheers, > Kostas > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns > [2] > https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java > [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html > > On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag > wrote: > > > > Hi, > > > > we're in very early stages evaluating options. I'm not a Flink expert, > but did read some of the docs and watched videos. Could you please help me > understand if and how certain of our reqs are covered by Flink (CEP). Is > this mailing list the right channel for such questions? > > > > 1) We receive files every day, which are exports from some database > tables, containing ONLY changes from the day. Most tables have modify-cols. > Even though they are files but because they contain changes only, I belief > the file records shall be considered events in Flink terminology. Is that > assumption correct? > > > > 2) The records within the DB export files are NOT in chronologically, > and we can not change the export. Our use case is a "complex event > processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, > then B, then C within 30 days, then do something". Does that work with > FlinkCEP despite the events/records are not in chrono order within the > file? The files are 100MB to 20GB in size. Do I need to sort the files > first before CEP processing? > > > > 3) Occassionally some crazy people manually "correct" DB records within > the database and manually trigger a re-export of ALL of the changes for > that respective day (e.g. last weeks Tuesday). Consequently we receive a > correction file. Same filename but "_1" appended. All filenames include the > date (of the original export). What are the options to handle that case > (besides telling the DB admins not to, which we did already). Regular > checkpoints and re-process all files since then? What happens to the CEP > state? Will it be checkpointed as well? > > > > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? > > > > 5) We also have CEP rules that must fire if after a start sequence > matched, the remaining sequence did NOT within a configured window. E.g. If > A, then B, but C did not occur within 30 days since A. Is that supported by > FlinkCEP? I couldn't find a working example. > > > > 6) We expect 30-40 CEP rules. How can we estimate the required storage > size for the temporary CEP state? Is there some sort of formular > considering number of rules, number of records per file or day, record > size, window, number of records matched per sequence, number of keyBy > grouping keys, ... > > > > 7) I can imagine that for debugging reasons it'd be good if we were able > to query the temporary CEP state. What is the (CEP) schema used to persist > the CEP state and how can we query it? And does such query work on the > whole cluster or only per node (e.g. because of shuffle and nodes > responsible only for a portion of the events). > > > > 8) I understand state is stored per node. What happens if I want to add > or remove a nodes. Will the state still be found, despite it being stored > in another node? I read that I need to be equally careful when changing > rules? Or is that a different issue? > > > > 9) How does garbage collection of temp CEP state work, or will it stay > forever? For tracing/investigation reasons I can imagine that purging it > at the earliest possible time is not always the best option. May be after > 30 days later or so. > > > > 10) Are there strategies to minimize temp CEP state? In SQL queries you > filter first on the "smallest" attributes. CEP rules form a sequence. Hence > that approach will not work. Is that an issue at all? What are practical > limits on the CEP temp state storage engine? > > > > 11) Occassionally we need to process about 200 files at once. Can I > speed things up by processing all files in parallel on multiple nodes, > despite their sequence (CEP use case)? This would only work if FlinkCEP in > step 1 simply filters on all relevant events of a sequence, updates state, > and in a step 2 - after the files are processed - evaluates the updated > state if that meets the sequences. > > > > 12) Schema changes in the input files: Occassionly the DB source system > schema is changed, and not always in a backwards compatible way (insert new > fields in the middle), and also the export will have the field in the > middle. This means that starting from a specific (file) date, I need to > consider a different schema. This must also be handled when re-running > files for the last month, because of corrections provided. And if the file > format has changed someone in the middle ... > > > > thanks a lot for your time and your help > > Juergen > --000000000000e82470059f132615 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
thanks a lot
Juergen
On Mon, = Feb 17, 2020 at 11:08 AM Kostas Kloudas <kkloudas@gmail.com> wrote:
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If fi= rst
A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by= Flink

3) Occassionally some crazy people manually "correct" DB records<= br> within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"=
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?=C2=A0 What happens to the CEP state? Will<= br> it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction&quo= t; in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" ma= ny
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a savepoint, remove the old pattern and add a new one (the
updated one) and tell Flink to ignore the state of the previous
operator (as said earlier each CEP pattern is translated to an
operator).

9) How does garbage collection of temp CEP state work, or will it stay
forever?=C2=A0 For tracing/investigation reasons I can imagine that purging=
it at the earliest possible time is not always the best option. May be
after 30 days later or so.

-> CEP clean state after the time horizon (specified with the
.within() clause) expires.

10) Are there strategies to minimize temp CEP state? In SQL queries
you=C2=A0 filter first on the "smallest" attributes. CEP rules fo= rm a
sequence. Hence that approach will not work. Is that an issue at all?
What are practical limits on the CEP temp state storage engine?

-> Such optimizations are not supported out of the box. I would
recommend to go with the Broadcast state approach in [3].

11) Occassionally we need to process about 200 files at once. Can I
speed things up by processing all files in parallel on multiple nodes,
despite their sequence (CEP use case)? This would only work if
FlinkCEP in step 1 simply filters on all relevant events of a
sequence, updates state, and in a step 2 - after the files are
processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source
system schema is changed, and not always in a backwards compatible way
(insert new fields in the middle), and also the export will have the
field in the middle. This means that starting from a specific (file)
date, I need to consider a different schema. This must also be handled
when re-running files for the last month, because of corrections
provided. And if the file format has changed someone in the middle ...

-> This seems to be relevant for the "data cleaning" phase, be= fore you
send your data to CEP. In this case, if the schema changes, then I
assume that you need to update your initial parsing logic, which means
taking a savepoint and redeploying the updated jobGraph with the new
input parsing logic (if I understand correctly).

thanks a lot for your time and your help

I hope that above helps!

Cheers,
Kostas

[1] https= ://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combini= ng-patterns
[2] https://github.com/apache/flink/blob/mast= er/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/Pat= ternTest.java
[3] https://flink.apache.org/news/= 2020/01/15/demo-fraud-detection.html

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
<juerg= en.donnerstag@gmail.com> wrote:
>
> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink= expert, but did read some of the docs and watched videos. Could you please= help me understand if and how certain of our reqs are covered by Flink (CE= P). Is this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database ta= bles, containing ONLY changes from the day. Most tables have modify-cols. E= ven though they are files but because they contain changes only, I belief t= he file records shall be considered events in Flink terminology. Is that as= sumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, = and we can not change the export. Our use case is a "complex event pro= cessing" case (FlinkCEP) with rules like "KeyBy(someKey) If first= A, then B, then C within 30 days, then do something". Does that work = with FlinkCEP despite the events/records are not in chrono order within the= file? The files are 100MB to 20GB in size. Do I need to sort the files fir= st before CEP processing?
>
> 3) Occassionally some crazy people manually "correct" DB rec= ords within the database and manually trigger a re-export of ALL of the cha= nges for that respective day (e.g. last weeks Tuesday). Consequently we rec= eive a correction file. Same filename but "_1" appended. All file= names include the date (of the original export). What are the options to ha= ndle that case (besides telling the DB admins not to, which we did already)= . Regular checkpoints and re-process all files since then?=C2=A0 What happe= ns to the CEP state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a proble= m?
>
> 5) We also have CEP rules that must fire if after a start sequence mat= ched, the remaining sequence did NOT within a configured window. E.g. If A,= then B, but C did not occur within 30 days since A. Is that supported by F= linkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage= size for the temporary CEP state? Is there some sort of formular consideri= ng number of rules, number of records per file or day, record size, window,= number of records matched per sequence, number of keyBy grouping keys, ...=
>
> 7) I can imagine that for debugging reasons it'd be good if we wer= e able to query the temporary CEP state. What is the (CEP) schema used to p= ersist the CEP state and how can we query it? And does such query work on t= he whole cluster or only per node (e.g. because of shuffle and nodes respon= sible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to ad= d or remove a nodes. Will the state still be found, despite it being stored= in another node? I read that I need to be equally careful when changing ru= les? Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay= forever?=C2=A0 For tracing/investigation reasons I can imagine that purgin= g it at the earliest possible time is not always the best option. May be af= ter 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries yo= u=C2=A0 filter first on the "smallest" attributes. CEP rules form= a sequence. Hence that approach will not work. Is that an issue at all? Wh= at are practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I sp= eed things up by processing all files in parallel on multiple nodes, despit= e their sequence (CEP use case)? This would only work if FlinkCEP in step 1= simply filters on all relevant events of a sequence, updates state, and in= a step 2 - after the files are processed - evaluates the updated state if = that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source syste= m schema is changed, and not always in a backwards compatible way (insert n= ew fields in the middle), and also the export will have the field in the mi= ddle. This means that starting from a specific (file) date, I need to consi= der a different schema. This must also be handled when re-running files for= the last month, because of corrections provided. And if the file format ha= s changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen
--000000000000e82470059f132615--