flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Writing Tuple2 to a sink
Date Fri, 24 Feb 2017 06:53:08 GMT
Hi Mohit,

I don’t completely understand your question, but I’m assuming that you know the type of
records your custom sink will be receiving, but you don’t know how to extract values from
the records.

Assume that the type of the incoming records will be `Tuple2<String, Integer>`. When
writing your custom sink, you should define that type by:

public class YourCustomSink implements SinkFunction<Tuple2<String, Integer>> {
    public void invoke(Tuple2<String, Integer> next) {
        // use next.f0 / next.f1 to retrieve values from the tuple


You can of course also define generic types to replace `String` and `Integer`, like so:

public class YourCustomSink<F, S> implements SinkFunction<Tuple2<F, S>>
    public void invoke(Tuple2<F, S> next) {
        F field1 = next.f0;
        S field2 = next.f1;


Just replace the generic types with concrete types when instantiating your custom sink, according
to your topology.

Let me know if this answers your question!


On February 24, 2017 at 10:42:33 AM, 刘彪 (mmyy1110@gmail.com) wrote:

Currently, OutputFormat is used for DataSet, SinkFunction is used for DataStream. Maybe I
misunderstand your problem. That will be better if you give more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia <mohitanchlia@gmail.com>:
This works for Kafka but for the other types of sink am I supposed to use some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <mmyy1110@gmail.com> wrote:
I think there is a good way in FlinkKafkaProducerBase.java to deal with this situation. There
is a KeyedSerializationSchema user have to implement.   KeyedSerializationSchema will be
used to serialize data, so that SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in SinkFunction. And user
have to implement the SerializationSchema, maybe named Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <mohitanchlia@gmail.com>:
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that
the type is not known inside the sink function?

View raw message