flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: S3 for state backend in Flink 1.4.0
Date Wed, 31 Jan 2018 13:48:50 GMT
Hi Edward,

The problem here is that readTextFile() and writeAsText() use the Flink FileSystem abstraction
underneath, which will pick up the s3 filesystem from opt. The BucketingSink, on the other
hand, uses the Hadoop FileSystem abstraction directly, meaning that there has to be some HadoopFilesystem
implementation for s3 in the path for this to work.

Also, the BucketingSink currently has some shortcomings when used with eventually consistent
file systems, such as S3. We are planning to solve those problems after releasing 1.5 and
there is also an open PR that provides an alternative sink that works with those kinds of
file system: https://github.com/apache/flink/pull/4607 <https://github.com/apache/flink/pull/4607>

Best,
Aljoscha

> On 31. Jan 2018, at 14:01, Edward Rojas <edward.rojascl@gmail.com> wrote:
> 
> Hi,
> 
> We are having a similar problem when trying to use Flink 1.4.0 with IBM
> Object Storage for reading and writing data. 
> 
> We followed
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html
> and the suggestion on https://issues.apache.org/jira/browse/FLINK-851.
> 
> We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder
> and we added the configuration on the flink-config.yaml:
> 
> s3.access-key: <ACCESS_KEY>
> s3.secret-key: <SECRET_KEY>
> s3.endpoint: s3.us-south.objectstorage.softlayer.net 
> 
> With this we can read from IBM Object Storage without any problem when using
> env.readTextFile("s3://flink-test/flink-test.txt");
> 
> But we are having problems when trying to write. 
> We are using a kafka consumer to read from the bus, we're making some
> processing and after saving  some data on Object Storage.
> 
> When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1);
> The file is created but only when the job finish (or we stop it). But we
> need to save the data without stopping the job, so we are trying to use a 
> Sink.
> 
> But when using a BucketingSink, we get the error: 
> java.io.IOException: No FileSystem for scheme: s3
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
> 	at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
> 	at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
> 	at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
> 
> 
> Do you have any idea how could we make it work using Sink?
> 
> Thanks,
> Regards,
> 
> Edward
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Mime
View raw message