flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Serialization schema
Date Fri, 24 Feb 2017 07:09:14 GMT
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem
by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference
to the enclosing outer class, and therefore serializing it will result in serializing the
outer class instance as well.

On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

This is at high level what I am doing:


String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()


String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;

On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains
fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint
the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the
whole `serialize` / `deserialize` implementation if you don’t want to.


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1110@gmail.com> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields
of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanchlia@gmail.com>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure
why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:

FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer,

"", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema

View raw message