flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Newport, Billy" <Billy.Newp...@gs.com>
Subject Serialization performance
Date Thu, 02 Mar 2017 19:01:08 GMT
We've been working on performance for the last while. We're using flink 1.2 right now. We are
writing batch jobs which process avro and parquet input files and produce parquet files.

Flink serialization costs seem to be the most significant aspect of our wall clock time. We
have written a custom kryo serializer for GenericRecord which is similar to the Spark one
in that it reuses avro Datum reader and writers but writes a hash fingerprint for the schema
instead of the schema itself.

We have subclassed most of the Rich* classes in flink and now also pass to the constructors
a Builder class which has the avro schema. When flink inflates these, the builders are inflated
and preregister the avro schema for the hash code in a static map which the inflation/writing
code uses later.

This is working pretty well for us, it's like 10-20x faster than just using GenericRecords
normally. The question is this: Is this the right way to do this? If it is then we can contribute
it and then how to modify beam so that it uses this stuff under the covers, we can't use beam
at all right now as far as I can tell because of the performance issues with GenericRecord.

The other performance optimization is basically removing filters which again seem to double
wall clock time. We wrote an embedded demux outputformat which receives a Tuple<Enum,GenericRecord>
and writes to a different parquet file depending on Enum. This was 2x faster than a naïve
4 filters going to 4 parquet outputformats.

Do these optimizations seem unnecessary to some? Is there some trick we're missing?


Mime
View raw message