flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ant burton <apburto...@gmail.com>
Subject Re: Access to datastream from BucketSink
Date Wed, 16 Aug 2017 11:30:21 GMT

I am I on the right path with the following:

class S3SinkFunc implements SinkFunction<String> {
    public void invoke(String element) {
        System.out.println(element);
        // don't have access to dataStream to call .addSink() :-(
    }
}

Thanks,

> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.kloudas@data-artisans.com> wrote:
> 
> Hi Ant,
> 
> I think you can do it by implementing your own Bucketer.
> 
> Cheers,
> Kostas
> 
> .
>> On Aug 16, 2017, at 1:09 PM, ant burton <apburton84@gmail.com> wrote:
>> 
>> Hello,
>> 
>> Given 
>> 
>>       // Set StreamExecutionEnvironment
>>       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>       // Set checkpoints in ms
>>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>       // Add source (input stream)
>>       DataStream<String> dataStream = StreamUtil.getDataStream(env, params);
>> 
>> How can I construct the s3_filename from the content of the an event, it seems that
whenever I attempt this I either have access to an event or access to .addSink but not both.
>> 
>> 	dataStream.addSink(new BucketingSink<String>("s3a://flink/" + s3_filename));
>> 
>> 
>> Thanks,
>> 
>> 
>> 
>> 
> 


Mime
View raw message