flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seth Wiesman (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem
Date Mon, 27 Feb 2017 19:55:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886417#comment-15886417

Seth Wiesman commented on FLINK-5706:

[~StephanEwen] To expand on our conversation from the mailing list.

As I mentioned before, EMR handles S3 consistency by building a [consistent view|http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html]
using a dynamo db table. The core idea is that we should be able to trust that Flink is in
a consistent state, so when a file system operation is performed such as a rename or a delete,
we should first check that the requested file exists and if not wait for it. However, the
file may actually not exist for whatever external reason or it may take S3 longer than we
are willing to wait to become consistent because S3 consistency has no upper bound. 

public void delete(String path) throws Exception {
    for (int i = 0;  i < numPasses; i++) {
       if (exists(path)) {
        } else {

    throw new FileNotFoundException("File either does not exist or took to long to become

This is as far as I went with my implementation and it seems to work for most cases but it
does contains two core issues: 

1) We are not able to differentiate between inconsistent files and missing files. If Flink
is running in real time we are probably running into a consistency error, but what if the
user restarts from a checkpoint in the past? In that case the files may actually not exist
leaving Flink in an inconsistent state which breaks the core invariant of this solution. 

2) Certain operations really do take to long to become consistent, causing the entire pipeline
to slow down. Take the bucketing sink as an example. On checkpoint the current in progress
file is [renamed to pending | https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L563].
This will work because S3 has read on write consistency. But after the checkpoint is complete
the [pending file is again renamed to complete | https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L655].
S3 renames do not have consistency guarantees and so this will eventually become inconsistent.
The problem I ran into was even with the updated fs implementation there would inevitably
be files which took upwards of several minutes to become consistent. Eventually this lead
to writing a custom sink specifically for S3 which understands what it is capable of. Ultimately
I believe this shows that for true S3 interop certain parts of this problem will bleed through
the file system abstraction. 

> Implement Flink's own S3 filesystem
> -----------------------------------
>                 Key: FLINK-5706
>                 URL: https://issues.apache.org/jira/browse/FLINK-5706
>             Project: Flink
>          Issue Type: New Feature
>          Components: filesystem-connector
>            Reporter: Stephan Ewen
> As part of the effort to make Flink completely independent from Hadoop, Flink needs its
own S3 filesystem implementation. Currently Flink relies on Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of the implementation,
the Hadoop File System can be used (Apache Licensed, should be okay to reuse some code as
long as we do a proper attribution).

This message was sent by Atlassian JIRA

View raw message