flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Cheung <felixcheun...@hotmail.com>
Subject Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled
Date Fri, 01 Sep 2017 18:13:40 GMT
Yap I was able to get this to work with a custom bucketer.

A custom bucketer can use the clock given ("processing time") or it can use a timestamp from
the data ("event time") for the bucketing path.

________________________________
From: Raja.Aravapalli <Raja.Aravapalli@target.com>
Sent: Friday, September 1, 2017 10:21:00 AM
To: Aljoscha Krettek; Piotr Nowojski
Cc: user@flink.apache.org
Subject: Re: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file
name everytime a new part file is rolled


Thanks Aljoscha for the inputs.

I will check to extend “BasePathBucketer” class.


Regards,
Raja.

From: Aljoscha Krettek <aljoscha@apache.org>
Date: Friday, September 1, 2017 at 10:27 AM
To: Piotr Nowojski <piotr@data-artisans.com>
Cc: Raja Aravapalli <Raja.Aravapalli@target.com>, "user@flink.apache.org" <user@flink.apache.org>
Subject: [EXTERNAL] Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name
everytime a new part file is rolled

Hi Raja,

I think you can in fact do this by implementing a custom Bucketer. You can have a look at
BasePathBucketer and extend that to include the timestamp in the path that is returned. You
should probably clamp the timestamp so that you don't get a new path for every millisecond.

Best,
Aljoscha

On 1. Sep 2017, at 08:18, Piotr Nowojski <piotr@data-artisans.com<mailto:piotr@data-artisans.com>>
wrote:

Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not specify a
dynamically generated prefix/suffix.

Piotrek

On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:


Hi,

I have a flink application that is streaming data into HDFS and I am using Bucketing Sink
for that. And, I want to know if is it possible to rename the part files that is being created
in the base hdfs directory.

Right now I am using the below code for including the timestamp into part-file name, but the
problem I am facing is the timestamp is not changing for the new part file that is being rolled!


BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);

HdfsSink.setBucketer(new BasePathBucketer<String>());
HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 'hdfsOutputBatchSizeInMB'
MB
HdfsSink.setPartPrefix("PART-FILE-" + Long.toString(System.currentTimeMillis()));


Can someone please suggest me, what code changes I can try so that I get a new timestamp for
every part file that is being rolled new?


Thanks a lot.

Regards,
Raja.



Mime
View raw message