Hi Elkhan,

Theo's advice is spot-on, you should use asyncIO with AsyncFunction.

AsyncIO is not performing any task asynchronously by itself though, so you should either use the async API of the library if existant or manage your own thread pool. The thread pool should be as large as your desired parallelism (4-8). AsyncIO then ensures that the results are emitted in the correct way (to have consistent state). AsyncIO internally uses a queue for the parallel tasks, which should be at least as large as the number of threads in your pool. You can decide if results should be published as fast as possible (UNORDERED) or in the order of arrival (ORDERED where a slow element stops faster elements from being published). For ORDERED, your queue length should be much longer for optimal performance. If your longest elements take 2 min but the average 30 sec, you want to have a queue that is 4 roughly times as big as the number of threads (no exact science here).

Multithreading within an operator is strongly discouraged in 1.6 and will even be impossible in the near future. The main reason is that it's really hard to reason about consistent state and have a consistent checkpoint. It's very easy to have duplicates are lost elements in such a setting.

On Thu, Apr 16, 2020 at 3:43 PM Theo Diefenthal <theo.diefenthal@scoop-software.de> wrote:

I think you could utilize AsyncIO in your case with just using a local thread pool [1].

Best regards

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

Von: "Elkhan Dadashov" <elkhan.dadashov@gmail.com>
An: "user" <user@flink.apache.org>
Gesendet: Donnerstag, 16. April 2020 10:37:55
Betreff: How to scale a streaming Flink pipeline without abusing parallelism for long computation tasks?

Hi Flink users,
I have a basic Flnk pipeline, doing flatmap.

inside flatmap, I get the input, path it to the client library to compute some result.

That library execution takes around 30 seconds to 2 minutes (depending on the input ) for producing the output from the given input ( it is time-series based long-running computation). 

As it takes the library long time to compute, the input payloads keep buffered, and if not given enough parallelism, the job will crash/restart. (java.lang.RuntimeException: Buffer pool is destroyed.)

Wanted to check what are other options for scaling Flink streaming pipeline without abusing parallelism for long-running computations in Flink operator?

Is multi-threading inside the operator recommended? ( even though the single input computation takes a long time, but I can definitely run 4-8 of them in parallel threads, instead of one by one, inside the same FlatMap operator.

1 core for each yarn slot ( which will hold 1 flatmap operator) seems too expensive. If we could launch more link operators with only 1 core, it could have been easier.

If anyone faced a similar issue please share your experience. I'm using Flink 1..6.3 version.



Arvid Heise | Senior Java Developer

Follow us @VervericaData


Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time


Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany


Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng