flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@gmail.com>
Subject RE: Kafka and Flink integration
Date Tue, 20 Jun 2017 15:49:41 GMT
Hi Nuno,

In general, if it is possible, it is recommended that you map your generic
classes to Tuples / POJOs [1].
For Tuples / POJOs, Flink will create specialized serializers for them,
whereas for generic classes (i.e. types which cannot be treated as POJOs)
Flink simply fallbacks to using Kryo for them.
The actual performance gain may depend a bit on what the original generic
class type looked like.

One other thing probably to look at is enabling object reuse for
de-/serialization. However, be aware that the user code needs to be aware
of this, otherwise it may lead to unexpected errors.



On 20 June 2017 at 11:24:03 PM, Nuno Rafael Goncalves (
nuno.goncalves@wedotechnologies.com) wrote:

I believe there are some performance impact while de/serializing, which is
“normal”. What I’m trying to understand is if there are any tips to improve
this process. For instance, tuples vs general class types. Do you know if
it’s worth it to map a custom object into tuple just for de/serialization

According to jfr analysis, kryo methods are hit a lot.

-----Original Message-----
From: Nico Kruber [mailto:nico@data-artisans.com]
Sent: 20 de junho de 2017 16:04
To: user@flink.apache.org
Cc: Nuno Rafael Goncalves <Nuno.Goncalves@wedotechnologies.com>
Subject: Re: Kafka and Flink integration

No, this is only necessary if you want to register a custom serializer
itself [1]. Also, in case you are wondering about registerKryoType() - this
is only needed as a performance optimisation.

What exactly is your problem? What are you trying to solve?

(I can't read JFR files here, and from what I read at Oracle's site, this
requires a commercial license, too...)


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/


On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution

> environment?

> My serialization into kafka is done with the following snippet


> try (ByteArrayOutputStream byteArrayOutStream = new

> Output output = new Output(byteArrayOutStream)) {

>       Kryo kryo = new Kryo();

>       kryo.writeClassAndObject(output, event);

>       output.flush();

>       return byteArrayOutStream.toByteArray();

>     } catch (IOException e) {

>       return null;

>     }


> "event" is my custom object.


> then i desirialize it in flink's kafka consumer

> try (ByteArrayInputStream byteArrayInStream = new

> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,

> bytes.length)) {

>       Kryo kryo = new Kryo();

>       return kryo.readClassAndObject(input);

>     } catch (IOException e) {

>       return null;

>     }


> Thanks




> --

> View this message in context:


> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User

> Mailing List archive. mailing list archive at Nabble.com.

View raw message