flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Access to datastream from BucketSink
Date Wed, 16 Aug 2017 15:06:09 GMT
Hi Ant,

I think you are implementing the wrong Bucketer. 
This seems to be the one for the RollingSink which is deprecated. 
Is this correct?

You should implement the BucketingSink one, which is in the package:

org.apache.flink.streaming.connectors.fs.bucketing

That one requires the implementation of 1 method with signature:

Path getBucketPath(Clock clock, Path basePath, T element);

which from what I understand from you requirements gives you access 
to the element that you need.

Cheers,
Kostas

> On Aug 16, 2017, at 3:31 PM, ant burton <apburton84@gmail.com> wrote:
> 
> 
> Thanks Kostas,
> 
> I’m narrowing in on a solution:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
says "You can also specify a custom bucketer by using setBucketer() on a BucketingSink. If
desired, the bucketer can use a property of the element or tuple to determine the bucket directory.”
> 
> BucketingSink<String> sink = new BucketingSink<String>("/base/path");
> sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
> Therefore I’ve created a skeleton class:
> 
> public class S3Bucketer implements Bucketer {
> 	private static final long serialVersionUID = 1L;
> 
> 	private final String formatString;
> 
> 	public S3Bucketer() {
> 	}
> 
> 	private void readObject(ObjectInputStream in) {
> 		in.defaultReadObject();
> 	}
> 
> 	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
> 		return true;
> 	}
> 
> 	public Path getNextBucketPath(Path basePath) {
> 		return new Path(basePath + “/some-path-that-I-need-create-from-the-stream");
> 	}
> }
> 
> my question now is how do I access the data stream from within the S3Bucketer so that
I can generate a filename based on the data with the data stream.
> 
> Thanks,
> 
>> On 16 Aug 2017, at 12:55, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
>> 
>> In the second link for the BucketingSink, you can set your 
>> own Bucketer using the setBucketer method. You do not have to 
>> implement your own sink from scratch.
>> 
>> Kostas
>> 
>>> On Aug 16, 2017, at 1:39 PM, ant burton <apburton84@gmail.com> wrote:
>>> 
>>> or rather https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>> 
>>> 
>>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.kloudas@data-artisans.com>
wrote:
>>>> 
>>>> Hi Ant,
>>>> 
>>>> I think you can do it by implementing your own Bucketer.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>> .
>>>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburton84@gmail.com>
wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> Given 
>>>>> 
>>>>>       // Set StreamExecutionEnvironment
>>>>>       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> 
>>>>>       // Set checkpoints in ms
>>>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>>       // Add source (input stream)
>>>>>       DataStream<String> dataStream = StreamUtil.getDataStream(env,
params);
>>>>> 
>>>>> How can I construct the s3_filename from the content of the an event,
it seems that whenever I attempt this I either have access to an event or access to .addSink
but not both.
>>>>> 
>>>>> 	dataStream.addSink(new BucketingSink<String>("s3a://flink/" +
s3_filename));
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Mime
View raw message