flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kathula, Sandeep" <Sandeep_Kath...@intuit.com>
Subject Re: S3 Checkpointing taking long time with stateful operations
Date Wed, 24 Jun 2020 00:46:36 GMT
Hi Dawid,
We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing
when reading from kafka topics. When I take a savepoint and restart from savepoint without
checkpointing, I can see that lag is reducing. So we think that there must be some problem
with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing
during checkpointing.

[cid:image001.png@01D64985.978B5FE0]

[cid:image002.png@01D64985.978B5FE0]
Thanks
Sandeep Kathula


From: Dawid Wysakowicz <dwysakowicz@apache.org>
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" <Sandeep_Kathula@intuit.com>, "user@flink.apache.org" <user@flink.apache.org>
Cc: "Vora, Jainik" <Jainik_Vora@intuit.com>, "Rosensweig, JD" <JD_Rosensweig@intuit.com>
Subject: Re: S3 Checkpointing taking long time with stateful operations


Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there.
The problem might be that the operators take too much time processing regular records which
delays the checkpoint barriers processing. If that's the case you might want to try increasing
the parallelism for the slow operators or revisit your processing logic.
At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration
is mainly due to barrier cannot be processed by operator as soon as possible.
Operator will only start checkpoint after processed checkpoint barrier, I think you might
need to check the back-pressure status of your job[1].
Back-pressure would make the checkpoint barrier move to downstream more slowly in the network
channels.


[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid


On 22/06/2020 20:21, Kathula, Sandeep wrote:
Hi Congxian,
Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get
barriers for a given checkpoint from all the task slots.
We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or,
do I need to change any settings related to RocksDB (we are mainly observing this behavior
with stateful operator which does incremental state store to RocksDB)?
We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap
memory. Parallelism is 20.

Please find the logs for barriers attached.

Any inputs on how to solve this?


Thanks
Sandeep Kathula
From: Congxian Qiu <qcx978132955@gmail.com><mailto:qcx978132955@gmail.com>
Date: Saturday, June 20, 2020 at 7:19 PM
To: "user@flink.apache.org"<mailto:user@flink.apache.org> <user@flink.apache.org><mailto:user@flink.apache.org>
Cc: "Kathula, Sandeep" <Sandeep_Kathula@intuit.com><mailto:Sandeep_Kathula@intuit.com>,
"Vora, Jainik" <Jainik_Vora@intuit.com><mailto:Jainik_Vora@intuit.com>, "Rosensweig,
JD" <JD_Rosensweig@intuit.com><mailto:JD_Rosensweig@intuit.com>
Subject: Re: S3 Checkpointing taking long time with stateful operations

This email is from an external sender.

Hi

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot}
+ $time{async-snapshot}.
For at least once mode, could you please try to enable the debug log to track the barrier
align process?
you can find the debug log such as
"Received barrier for checkpoint {} from channel {}"
"Received cancellation barrier for checkpoint {} "
"Received all barriers for checkpoint {}"

Best,
Congxian


Yun Tang <myasuka@live.com<mailto:myasuka@live.com>> 于2020年6月19日周五
上午11:48写道:
Hi Sandeep

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration
is mainly due to barrier cannot be processed by operator as soon as possible.
Operator will only start checkpoint after processed checkpoint barrier, I think you might
need to check the back-pressure status of your job[1].
Back-pressure would make the checkpoint barrier move to downstream more slowly in the network
channels.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best
Yun Tang
________________________________
From: Kathula, Sandeep <Sandeep_Kathula@intuit.com<mailto:Sandeep_Kathula@intuit.com>>
Sent: Friday, June 19, 2020 9:19
To: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>>
Cc: Vora, Jainik <Jainik_Vora@intuit.com<mailto:Jainik_Vora@intuit.com>>; Rosensweig,
JD <JD_Rosensweig@intuit.com<mailto:JD_Rosensweig@intuit.com>>
Subject: S3 Checkpointing taking long time with stateful operations


Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental
state to true with checkpoints written to S3.

  *   10 task managers each with 2 task slots
  *   Checkpoint interval 3 minutes
  *   Checkpointing mode – At-least once processing



After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes
with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes
10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0.
We are seeing a dip in processing during this time. Couldn’t find out what the actual issue
is.



We also tried with remote HDFS for checkpointing but observed similar behavior.



We have couple of questions:

  *   When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing
taking almost 2 minutes?
  *   How can we reduce the checkpoint time?

[A screenshot of a cell phone                         Description automatically generated]



Any help would be appreciated.





Thank you

Sandeep Kathula




Mime
View raw message