flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@ververica.com>
Subject Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter
Date Thu, 13 Feb 2020 12:12:08 GMT

As Kostas has pointed out, the operator's and udf’s APIs are not thread safe and Flink always
is calling them from the same, single Task thread. This also includes checkpointing state.
Also as Kostas pointed out, the easiest way would be to try use AsyncWaitOperator. If that’s
not possible, you can implement your custom logic based on its code.

>  So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.

In order to achieve this: to emit from a different thread:

Pre Flink 1.10

In the past (before Flink 1.10, so including Flink 1.9), multi threaded operators were supposed
to acquire so called “checkpointingLock”. You can hold a reference to the output collector,
but before emitting something, you have to acquire `checkpointingLock`. Note that if you acquire
it and don’t release for some period of time, whole Task will be blocked from making any

Flink 1.10+

in Flink 1.10 `checkpointLock` was deprecated and will be removed in Flink 1.11. It is replaced
by registering asynchronous runnable callbacks “mails", that can be executed by the task
thread. So if you:
a) want to emit results produced by a custom thread
b) modify the operator’s state as a result of some work done by a custom thread
In both cases, both things have to be done inside the “mail” action. So for example pattern
for a) is:

1. External thread creates record R1 to emit
2. External thread creates a “mail” to emit record R1, and it enqueues it into the mailbox
3. Task's thread picks up the the mail, executes it’s code, and that codes is emitting the
record R1 from the Task thread

For both of those patterns, please take a look at the AsyncWaitOperator code in the respective
Flink versions. Just keep in mind, that if you implement it using `checkpointingLock`, this
will not work anymore in Flink 1.11.


> On 13 Feb 2020, at 10:56, Salva Alcántara <salcantaraphd@gmail.com> wrote:
> I still need to get into the AsyncWaitOperator, but after taking a look at
> the Async I/O API, it seems that the normal use case is when you expect a
> result for each element in the input stream, so you register a callback
> together with a timeout for each input element. This is not exactly what my
> use case requires. In particular, when I send an event to the third party
> library, I might get a result...or not. The library is used for detecting
> certain patterns, so it is not as when you are querying a database, where
> you expect a result within a given time frame for each input element. In my
> case, it is more the other way around, most of the time you will not be
> expecting any outcome (think of anomaly detection). What I need is a way to
> collect the result (if any) from my third party library in my
> ProcessFunction, knowing that these outcomes will be exceptional compared
> with the cardinality of the input stream. After giving some extra thoughts,
> I don't know if the Async I/O pattern really suits my needs...
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

View raw message