flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rune Skou Larsen <...@trifork.com>
Subject Re: Checkpoints very slow with high backpressure
Date Mon, 24 Apr 2017 09:08:51 GMT
Sorry I cant help you, but we're also experiencing slow checkpointing, 
when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail - 
checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing 
with backpressure becomes faster?

- Rune


Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
> Im sorry guys if you received multiple instances of this mail, I kept 
> trying to send it yesterday, but looks like the mailing list was stuck 
> and didn't dispatch it until now. Sorry for the disturb.
> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzougui@mindlytix.com 
> <mailto:y.marzougui@mindlytix.com>> wrote:
>     Hi all,
>     I have a Streaming pipeline as follows:
>     1 - read a folder continuousely from HDFS
>     2 - filter duplicates (using keyby(x->x) and keeping a state per
>     key indicating whether its is seen)
>     3 - schedule some future actions on the stream using
>     ProcessFunction and processing time timers (elements are kept in a
>     MapState)
>     4- write results back to HDFS using a BucketingSink.
>     I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit:
>     9fb074c).
>     Currenlty the source contain just one a file of 1GB, so that's the
>     maximum state that the job might hold. I noticed that the
>     backpressure on the operators #1 and #2 is High, and the split
>     reader has only read 60 Mb out of 1Gb source source file. I
>     suspect this is because the ProcessFunction is slow (on purpose).
>     However looks like this affected the checkpoints which are failing
>     after the timeout (which is set to 2 hours), see attached screenshot.
>     ​
>     In the job manager logs I keep getting warnings :
>     2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a
of job 6c7e44d205d738fc8a6cb4da181d2d86.
>     Is the high backpressure the cause for the checkpoints being too
>     slow? If yes Is there a way to disbale the backpressure mechanism
>     since the records will be buffered in the rocksdb state after all
>     which is backed by the disk?
>     Thank you.
>     Best,
>     Yassine

Venlig hilsen/Best regards *Rune Skou Larsen*

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone +45 
3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen

View raw message