flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "theo.diefenthal@scoop-software.de" <theo.diefent...@scoop-software.de>
Subject Filter events based on future events
Date Tue, 10 Sep 2019 21:06:08 GMT
<div dir="auto">Hi there, <div><br /></div><div>I have the following
use case:</div><div><br /></div><div>I get transaction logs
from multiple servers. Each server puts its logs into its own Kafka partition so that within
each partition the elements are monothonically ordered by time. </div><div><br
/></div><div>Within the stream of transactions, we have some special events.
Let&#39;s call them A. (roughly 1-10% in distribution have this type). </div><div><br
/></div><div>An A event can have an Anti-A event later on in time. That is
an event which has all the same attributes (like username, faculty,..) but differs in one
boolean attribute indicating that it is an anti event. Kind of a retraction. </div><div><br
/></div><div>Now I want to emit almost all events downstream (including neither
A nor Anti-A, let&#39;s call them simpy B), preserving the monothonical order of events.
There is just one special case in which I want to filter out an element: If the stream has
an A event followed by an Anti-A event within one minute time, only the Anti-A event shall
go downstream, not A itself. But if there is no Anti-A event, A shall be emitted and shall
still be within timestamp order of events. </div><div><br /></div><div>I&#39;m
wrangling my head around it a lot and don&#39;t come up with a proper (performant) solution.
It seems to be obvious that in the end, I need to buffer all records over 1 minute so that
order can be preserved. But I have no idea how to implement this in Flink efficiently. </div><div><br
/></div><div>My thoughts thus far:</div><div><br /></div><div>1.
I could give CEP a try. But in that CEP I would need to write something like match all B events
in any case. And match A also but only if there is no anti A &#61;&gt; doesn&#96;t
that produce a lot of state? And are all B events considered in the breadth first rule match
approach, I. E. Tons of unnecessary comparisons against A? Any pseudo code on how I could
do this with CEP? </div><div><br /></div><div>2. If I key data
by partition and all other attributes except for the retract boolean so that A and anti A
always fall into the same keyed stream but no other event in that stream, I probably get much
better comparison capabilities. But how much overhead do I produce with it? Will Flink reshuffle
the data even if the first key stays the same? And can I backpartiton to my &#34;global&#34;
per partition order? Note that some events have the exact event time timestamp but I still
want to have them in their original order later on. </div><div><br /></div><div>3.
Could I work with session windows somehow? Putting A and Anti A in the same session and in
window emit I would just not collect the A event if there is an Anti A? Would it be more or
less overhead compared to CEP?</div><div><br /></div><div>4.
Do you have any other idea on how to approach this? Sadly, I have no way to manipulate the
input stream, so that part of the pipeline is fixed.</div><div><br /></div><div>Best
View raw message