flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pasquale Vazzana <P.Vazz...@mwam.com>
Subject RE: Subtask much slower than the others when creating checkpoints
Date Tue, 15 Jan 2019 12:08:25 GMT
I can send you some debug logs and the execution plan, can I use your personal email? There
might be sensitive info in the logs.

Incoming and Outgoing records are fairly distributed across subtasks, with similar but alternate
loads, when the checkpoint is triggered, the load drops to nearly zero, all the fetch requests
sent to kafka (2.0.1) time out and often the clients disconnect from the brokers.
Both source topics are 30 partitions each, they get keyed, connected and co-processed.
I am checkpointing with EOS, as I said I’ve tried all the backend with either DELETE_ON_CANCELLATION
or RETAIN_ON_CANCELLATION. I assume that using the MemoryStateBackend and CANCELLATION should
remove any possibility of disk/IO congestions, am I wrong?.

Pasquale

From: Till Rohrmann <trohrmann@apache.org>
Sent: 15 January 2019 10:33
To: Pasquale Vazzana <P.Vazzana@mwam.com>
Cc: Bruno Aranda <baranda@apache.org>; user <user@flink.apache.org>
Subject: Re: Subtask much slower than the others when creating checkpoints

Same here Pasquale, the logs on DEBUG log level could be helpful. My guess would be that the
respective tasks are overloaded or there is some resource congestion (network, disk, etc).

You should see in the web UI the number of incoming and outgoing events. It would be good
to check that the events are similarly sized and can be computed in roughly the same time.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana <P.Vazzana@mwam.com<mailto:P.Vazzana@mwam.com>>
wrote:
I have the same problem, even more impactful. Some subtasks stall forever quite consistently.
I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't help.
The Backend doesn't seem to make any difference, I've tried Memory, FS and RocksDB back ends
but nothing changes. I've also tried to change the medium, local spinning disk, SAN or mounted
fs but nothing helps.
Parallelism is the only thing which mitigates the stalling, when I set 1 everything works
but if I increase the number of parallelism then everything degrades, 10 makes it very slow
30 freezes it.
It's always one of two subtasks, most of them does the checkpoint in few milliseconds but
there is always at least one which stalls for minutes until it times out. The Alignment seems
to be a problem.
I've been wondering whether some Kafka partitions where empty but there is not much data skew
and the keyBy uses the same key strategy as the Kafka partitions, I've tried to use murmur2
for hashing but it didn't help either.
The subtask that seems causing problems seems to be a CoProcessFunction.
I am going to debug Flink but since I'm relatively new to it, it might take a while so any
help will be appreciated.

Pasquale


From: Till Rohrmann <trohrmann@apache.org<mailto:trohrmann@apache.org>>
Sent: 08 January 2019 17:35
To: Bruno Aranda <baranda@apache.org<mailto:baranda@apache.org>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Subtask much slower than the others when creating checkpoints

Hi Bruno,

there are multiple reasons wh= one of the subtasks can take longer for checkpointing. It looks
as if the=e is not much data skew since the state sizes are relatively equal. It als= looks
as if the individual tasks all start at the same time with the chec=pointing which indicates
that there mustn't be a lot of back-pressure =n the DAG (or all tasks were equally back-pressured).
This narrows the pro=lem cause down to the asynchronous write operation. One potential problem
=ould be if the external system to which you write your checkpoint data has=some kind of I/O
limit/quota. Maybe the sum of write accesses deplete the =aximum quota you have. You could
try whether running the job with a lower =arallelism solves the problems.

For further debug=ing it could be helpful to get access to the logs of the JobManager and
th= TaskManagers on DEBUG log level. It could also be helpful to learn which =tate backend
you are using.

Cheers,
Til=

On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda <mailto:baranda@apache.org<mailto:baranda@apache.org>>
wrote:
Hi,

We are using Flink =.6.1 at the moment and we have a streaming job configured to create a
chec=point every 10 seconds. Looking at the checkpointing times in the UI, we c=n see that
one subtask is much slower creating the endpoint, at least in i=s "End to End Duration", and
seems caused by a longer "Chec=point Duration (Async)".

For instance, in th= attach screenshot, while most of the subtasks take half a second, one
(an= it is always one) takes 2 seconds.

But we have w=rse problems. We have seen cases where the checkpoint times out for one ta=ks,
while most take one second, the outlier takes more than 5 minutes (whi=h is the max time we
allow for a checkpoint). This can happen if there is =ack pressure. We only allow one checkpoint
at a time as well.
Why could one subtask take more time? This jobs read from kafk= partitions and hash by key,
and we don't see any major data skew betw=en the partitions. Does one partition do more work?

We do have a cluster of 20 machines, in EMR, with TMs that have multiple=slots (in legacy
mode).

Is this something that co=ld have been fixed in a more recent version?

Than=s for any insight!

Bruno


This e-mail and any attachments are confidential to the addressee(s) and may contain information
that is legally privileged and/or confidential. Please refer to http://www.mwam.com/email-disclaimer-uk
for important disclosures regarding this email. If we collect and use your personal data we
will use it in accordance with our privacy policy, which can be reviewed at https://www.mwam.com/privacy-policy
.

Marshall Wace LLP is authorised and regulated by the Financial Conduct Authority. Marshall
Wace LLP is a limited liability partnership registered in England and Wales with registered
number OC302228 and registered office at George House, 131 Sloane Street, London, SW1X 9AT.
If you are receiving this e-mail as a client, or an investor in an investment vehicle, managed
or advised by Marshall Wace North America L.P., the sender of this e-mail is communicating
with you in the sender's capacity as an associated person of and on behalf of Marshall Wace
North America L.P., which is registered with the US Securities and Exchange Commission as
an investment adviser.

This e-mail and any attachments are confidential to the addressee(s) and may contain information
that is legally privileged and/or confidential. Please refer to http://www.mwam.com/email-disclaimer-uk
for important disclosures regarding this email. If we collect and use your personal data we
will use it in accordance with our privacy policy, which can be reviewed at https://www.mwam.com/privacy-policy
.

Marshall Wace LLP is authorised and regulated by the Financial Conduct Authority. Marshall
Wace LLP is a limited liability partnership registered in England and Wales with registered
number OC302228 and registered office at George House, 131 Sloane Street, London, SW1X 9AT.
If you are receiving this e-mail as a client, or an investor in an investment vehicle, managed
or advised by Marshall Wace North America L.P., the sender of this e-mail is communicating
with you in the sender's capacity as an associated person of and on behalf of Marshall Wace
North America L.P., which is registered with the US Securities and Exchange Commission as
an investment adviser.
Mime
View raw message