Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?


On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddragon@gmail.com> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <vkolluru15@gmail.com> wrote:

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));