flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Monika Hristova <monik...@sbtech.com>
Subject RE: Constant backpressure on flink job
Date Tue, 07 May 2019 12:54:35 GMT
Hello Dawid,

Thank you for your response.
What I actually noticed is that first checkpoint starts failing with exception "Checkpoint
expired before completing". It keeps failing and then (after an hour or so ) the backpressure
occurs. This is my job graph(please see attached files JobGraph_1) with marked in red problematic
operators. After this the backpressure occurs and operators marked in red are those with high
backpressure (JobGraph_2).

Best Regards,

From: Dawid Wysakowicz <dwysakowicz@apache.org>
Sent: Thursday, April 25, 2019 11:14 AM
To: Monika Hristova <monika.h@sbtech.com>; user@flink.apache.org
Subject: Re: Constant backpressure on flink job

Hi Monika,

I would start with identifying the operator that causes backpressure. More information how
to monitor backpressure you can find here in the docs[1]. You might also be interested in
Seth's (cc'ed) webinar[2], where he also talks how to debug backpressure.



[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
On 22/04/2019 17:44, Monika Hristova wrote:

We are experiencing regular backpressure (at least once a week). I would like to ask how we
can fix it.

Currently our configurations are:
vi /usr/lib/flink/conf/flink-conf.yaml
# Settings applied by Cloud Dataproc initialization action
jobmanager.rpc.address: bonusengine-prod-m-0
jobmanager.heap.mb: 4096
jobmanager.rpc.port: 6123
taskmanager.heap.mb: 4096
taskmanager.memory.preallocate: false
taskmanager.numberOfTaskSlots: 8
#taskmanager.network.numberOfBuffers: 21952     # legacy deprecated
taskmanager.network.memory.fraction: 0.9
taskmanager.network.memory.min: 67108864
taskmanager.network.memory.max: 1073741824
taskmanager.memory.segment-size: 65536
parallelism.default: 52
web.port: 8081
web.timeout: 120000
heartbeat.interval: 10000
heartbeat.timeout: 100000
yarn.application-attempts: 10
high-availability: zookeeper
high-availability.zookeeper.quorum: bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
high-availability.zookeeper.path.root: /flink
#high-availability.zookeeper.storageDir: hdfs:///flink/recovery     # legacy deprecated
high-availability.storageDir: hdfs:///flink/recovery
fs.hdfs.hadoopconf: /etc/hadoop/conf
state.backend: rocksdb
state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
state.savepoints.dir: hdfs:///bonusengine/savepoints
metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.port: 8125
zookeeper.sasl.disable: true
yarn.reallocate-failed: true
yarn.maximum-failed-containers: 32
web.backpressure.refresh-interval: 60000
web.backpressure.num-samples: 100
web.backpressure.delay-between-samples: 50

with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16 vCPUs, 60 GB with
yarn configuration(see attached file)

We have one yarn session started where 8 jobs are run. Three of them are consuming the same
source (kafka) which is causing the backpressure, but only one of them experiences backpressure.
The state of the job is 20 something MB and the checkpoint is configured as follows:
checkpointing.interval=300000 # makes sure value in  ms of progress happens between checkpoints
checkpointing.pause_between_checkpointing=240000 # checkpoints have to complete within value
in ms or are discarded checkpointing.timeout=60000 # allows given number of checkpoints to
be in progress at the same time checkpointing.max_concurrent_checkpoints=1 # enables/disables
externalized checkpoints which are retained after job cancellation checkpointing.externalized_checkpoints.enabled=true

as checkpointing pause was increased and timeout was reduced on multiple occasions as the
job kept failing unable to recover from backpressure. RocksDB is configured state backend.
The problem keeps reproducing even with one minute timeout. Also I would like to point out
that the perfect checkpointing for that job would be 2 min.
I would like to ask what might be the problem or at least how to trace it ?

Best Regards,
Monika Hristova
Get Outlook for Android<https://aka.ms/ghei36>

View raw message