flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Zhen Wu (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance
Date Mon, 26 Mar 2018 22:22:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414085#comment-16414085
] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:21 PM:
-----------------------------------------------------------------

[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for
that, we configure a *static* 4-char random hex for the checkpoint path for each routing job
during deployment so that S3 writes *from many jobs* are spread different S3 partitions.
that requires no change in Flink, just some deployment tooling automation. Also for this case,
we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job
manager writes one uber metadata file (with state embedded)
 * single large-state job with TBs state written from thousands of operators. To avoid throttling,
we need S3 writes *from many operators* to spread to different S3 partitions. that's where
we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal
of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to be applied to
the complete path (not just the prefix configured in flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never
take a savepoint for case #2 (single large-state job).

 


was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]

We run into S3 throttling issue for two kinds of scenarios. 
 * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for
that, we configure a *static* 4-char random hex for the checkpoint path for each routing job
during deployment so that S3 writes *from many jobs* are spread different S3 partitions.
that requires no change in Flink, just some deployment tooling automation. 
 * single large-state job with TBs state written from thousands of operators. To avoid throttling,
we need S3 writes *from many operators* to spread to different S3 partitions. that's where
we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal
of __ENTROPY___KEY__

I can see that hash approach can work for both scenario. but the hash need to be applied to
the complete path (not just the prefix configured in flink-conf.yaml)

But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we can never
take a savepoint for case #2 (single large-state job).

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-9061
>                 URL: https://issues.apache.org/jira/browse/FLINK-9061
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, State Backends, Checkpointing
>    Affects Versions: 1.4.2
>            Reporter: Jamie Grier
>            Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale jobs (those
with many total tasks).  The issue is that we are writing all the checkpoint data under a
common key prefix.  This is the worst case scenario for S3 performance since the key is used
as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 and an
internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code that allows
me to "rewrite" paths.  For example say I have the checkpoint directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a pretty
serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message