flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Serialization performance
Date Fri, 10 Mar 2017 11:15:06 GMT
I've filed a JIRA for improving the support for Avro GenericRecords:
https://issues.apache.org/jira/browse/FLINK-6022

On Tue, Mar 7, 2017 at 7:13 PM, Stephan Ewen <sewen@apache.org> wrote:

> I'll try and add more details in a bit.
>
> If you have some suggestions on how to make the serialization stack more
> extensible, please let us know!
>
> Some hooks exist, like TypeInfoFactories: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/types_
> serialization.html#defining-type-information-using-a-factory
>
> But I think that hook does not work for Avro...
>
>
> On Tue, Mar 7, 2017 at 1:25 PM, Newport, Billy <Billy.Newport@gs.com>
> wrote:
>
>> I need more details, flink does not appear to be really designed to add
>> in serializers in a ‘nice’ way as far as I can tell, it’s kind of hardcoded
>> for Kryo right now.
>>
>>
>>
>>
>>
>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>> *Sent:* Tuesday, March 07, 2017 6:21 AM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Serialization performance
>>
>>
>>
>> Hi Billy!
>>
>>
>>
>> Out of curiosity: Were you able to hack some direct Avro support as I
>> described in the brief writeup, or do you need some more details?
>>
>>
>>
>> Stephan
>>
>>
>>
>> On Fri, Mar 3, 2017 at 2:38 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>> Hi Billy,
>>
>> on the Beam side, you probably have looked into writing your own Coder
>> (the equivalent of a TypeSerializer in Flink). If yes, did that not work
>> out for you? And if yes, why?
>>
>>
>>
>> Best,
>>
>> Aljoscha
>>
>>
>>
>>
>>
>> On Thu, Mar 2, 2017, at 22:02, Stephan Ewen wrote:
>>
>> Hi!
>>
>>
>>
>> I can write some more details later, here the short answer:
>>
>>
>>
>>   - Your serializer would do into something like the AvroSerializer
>>
>>
>>
>>   - When you instantiate the AvroSerializer in
>> GenericTypeInfo.createSerializer(ExecutionConfig), you pre-register the
>> type of the generic type, plus all types in "ExecutionConfig.getRegistered
>> KryoTypes()"
>>
>>     (we abuse the Kryo type registration here as an Avro type
>> registration initially, would have to polish that later)
>>
>>
>>
>>   - The registered types are classes, but since they are Avro types, you
>> should be able to get their schema (for Reflect Data or so)
>>
>>
>>
>> That way, Flink would internally forward all the registrations for you
>> (similar as it forwards Kryo registrations) and you don't have to manually
>> do that.
>>
>>
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Thu, Mar 2, 2017 at 9:31 PM, Newport, Billy <Billy.Newport@gs.com>
>> wrote:
>>
>> This is what we’re using as our serializer:
>>
>>
>>
>> Somewhere:
>>
>>
>>
>>            env.addDefaultKryoSerializer(Record.*class*, GRKryoSerializer.
>> *class*);
>>
>>
>>
>> then
>>
>>
>>
>> *public* *class* GRKryoSerializer *extends*
>> Serializer<GenericData.Record>
>>
>> {
>>
>>      *static* *class* AvroStuff
>>
>>      {
>>
>>            Schema schema;
>>
>>            String comment;
>>
>>            *long* key;
>>
>>            DatumReader<GenericRecord> reader;
>>
>>            DatumWriter<GenericRecord> writer;
>>
>>      }
>>
>>      *static* Map<Long, AvroStuff> *schemaMap* = *new*
>> ConcurrentHashMap<>();
>>
>>      *static* Map<Schema, Long> *schemaToFingerprintMap* = *new*
>> ConcurrentHashMap<>();
>>
>>      *static* Logger *log* = Logger.*getLogger*(GRKryoSerializer.*class*
>> .getName());
>>
>>
>>
>>
>>
>>      *static* *public* *void* preregisterSchema(String comment, Schema
>> schema)
>>
>>      {
>>
>>            *if*(!*schemaToFingerprintMap*.containsKey(schema)){
>>
>>                 *long* fingerprint = SchemaNormalization.
>> *parsingFingerprint64*(schema);
>>
>>                 AvroStuff stuff = *new* AvroStuff();
>>
>>                 stuff.schema = schema;
>>
>>                 stuff.comment = comment;
>>
>>                 stuff.key = fingerprint;
>>
>>                 stuff.reader = *new* GenericDatumReader<>(schema);
>>
>>                 stuff.writer = *new* GenericDatumWriter<>(schema);
>>
>>                 *log*.info(String.*format*("Preregistering schema for %s
>> with fingerprint %d", comment, fingerprint));
>>
>>                 *schemaMap*.put(fingerprint, stuff);
>>
>>                 *schemaToFingerprintMap*.put(schema, fingerprint);
>>
>>            }
>>
>>      }
>>
>>
>>
>>      *public* GRKryoSerializer() {
>>
>>      }
>>
>>
>>
>>      *static* *public* *void* clearSchemaCache()
>>
>>      {
>>
>>            *schemaToFingerprintMap*.clear();
>>
>>            *schemaMap*.clear();
>>
>>      }
>>
>>
>>
>>      *static* *public* AvroStuff getStuffFor(GenericRecord o)
>>
>>      {
>>
>>            *return* *getStuffFor*(o.getSchema());
>>
>>      }
>>
>>
>>
>>      *static* *public* AvroStuff getStuffFor(Schema schema)
>>
>>      {
>>
>>            Long fingerprint = *schemaToFingerprintMap*.get(schema);
>>
>>            *if*(fingerprint == *null*)
>>
>>            {
>>
>>
>>
>>                 fingerprint = SchemaNormalization.*parsingFingerprint64*(
>> schema);
>>
>>                 *log*.info(String.*format*("No fingerprint. Generated %d
>> for schema %s", fingerprint, schema.toString(*true*)));
>>
>>                 *schemaToFingerprintMap*.put(schema, fingerprint);
>>
>>
>>
>>                 *throw* *new* RuntimeException("Unknown schema " + schema
>> .toString(*true*));
>>
>>
>>
>>            }
>>
>>            *return* *schemaMap*.get(fingerprint);
>>
>>      }
>>
>>
>>
>>      @Override
>>
>>      *public* *void* write(Kryo kryo, Output output, GenericData.Record
>> object)
>>
>>      {
>>
>>            AvroStuff stuff = *getStuffFor*(object);
>>
>>
>>
>>            BinaryEncoder encoder = EncoderFactory.*get*().binaryEncoder(
>> output, *null*);
>>
>>            *try* {
>>
>>                 // write the schema key not the schema
>>
>>                 encoder.writeLong(stuff.key);
>>
>>                 // write the binary version of the fields only
>>
>>                 stuff.writer.write(object, encoder);
>>
>>                 encoder.flush();
>>
>>            } *catch* (IOException e)
>>
>>            {
>>
>>                 *throw* *new* RuntimeException(e);
>>
>>            }
>>
>>      }
>>
>>
>>
>>      @Override
>>
>>      *public* GenericData.Record read(Kryo kryo, Input input,
>> Class<GenericData.Record> type)
>>
>>      {
>>
>>            BinaryDecoder decoder = DecoderFactory.*get*().directBin
>> aryDecoder(input, *null*);
>>
>>            *long* fingerPrint;
>>
>>            *try* {
>>
>>                 // read the key
>>
>>                 fingerPrint = decoder.readLong();
>>
>>                 // find it
>>
>>                 AvroStuff stuff = *schemaMap*.get(fingerPrint);
>>
>>                 // inflate using correct preregistered inflator
>>
>>                 *return* (Record) stuff.reader.read(*null*, decoder);
>>
>>            } *catch* (IOException e) {
>>
>>                 *throw* *new* RuntimeException(e);
>>
>>            }
>>
>>      }
>>
>>
>>
>>
>>
>> }
>>
>>
>>
>> We add an instance of one of these to all our Flink Rich operations:
>>
>>
>>
>>
>>
>> *public* *class* GRBuilder *implements* Serializable {
>>
>>      *public* String getComment() {
>>
>>            *return* comment;
>>
>>      }
>>
>>
>>
>>      *public* *void* setSchema(Schema schema) {
>>
>>            *this*.schema = schema;
>>
>>      }
>>
>>
>>
>>      /**
>>
>>      *
>>
>>       */
>>
>>      *private* *static* *final* *long* *serialVersionUID* =
>> -3441080975441473751L;
>>
>>      String schemaString;
>>
>>      String comment;
>>
>>
>>
>>      *transient* GenericRecordBuilder builder = *null*;
>>
>>      *transient* Schema schema = *null*;
>>
>>
>>
>>      *public* *void* registerSchema(){
>>
>>            GRKryoSerializer.*preregisterSchema*(comment, getSchema());
>>
>>      }
>>
>>
>>
>>      *private* *void* readObject(ObjectInputStream input)
>>
>>             *throws* IOException, ClassNotFoundException
>>
>>      {
>>
>>            realReadObject(input);
>>
>>      }
>>
>>
>>
>>      *private* *void* writeObject(ObjectOutputStream output)
>>
>>             *throws* IOException, ClassNotFoundException
>>
>>      {
>>
>>            realWriteObject(output);
>>
>>      }
>>
>>
>>
>>      // Ensure on inflation, the schema is registered against
>>
>>      // the hashcode locally so we can inflate that type
>>
>>
>>
>>      *protected* *void* realReadObject(ObjectInputStream input)
>>
>>             *throws* IOException, ClassNotFoundException
>>
>>      {
>>
>>            schemaString = input.readUTF();
>>
>>            comment = input.readUTF();
>>
>>            builder = *null*;
>>
>>            schema = *null*;
>>
>>            GRKryoSerializer.*preregisterSchema*(comment, getSchema());
>>
>>      }
>>
>>
>>
>>      *protected* *void* realWriteObject(ObjectOutputStream output)
>>
>>             *throws* IOException, ClassNotFoundException
>>
>>      {
>>
>>            output.writeUTF(schemaString);
>>
>>            output.writeUTF(comment);
>>
>>      }
>>
>>
>>
>>      *public* GRBuilder()
>>
>>      {
>>
>>      }
>>
>>
>>
>>      *public* GRBuilder(String comment , Schema s){
>>
>>            schemaString = s.toString();
>>
>>            builder = *null*;
>>
>>            *this*.comment = comment;
>>
>>
>>
>>            GRKryoSerializer.*preregisterSchema*(comment, s);
>>
>>      }
>>
>>
>>
>>      *public* *synchronized* GenericRecordBuilder getBuilder()
>>
>>      {
>>
>>            *if*(builder == *null*)
>>
>>            {
>>
>>                 builder = *new* GenericRecordBuilder(getSchema());
>>
>>            }
>>
>>            *return* builder;
>>
>>      }
>>
>>
>>
>>      *public* *synchronized* Schema getSchema()
>>
>>      {
>>
>>            *if*(schema == *null*)
>>
>>            {
>>
>>                 Schema.Parser p = *new* Schema.Parser();
>>
>>                 schema = p.parse(schemaString);
>>
>>            }
>>
>>            *return* schema;
>>
>>      }
>>
>> }
>>
>>
>>
>> Our Mappers and such use the GRBuilder on the FlatMap rich class for
>> example to get a Builder to create the output records for collection. We
>> need to have A GRBUilder for each possible genericrecord schema as a
>> variable on a Map object.
>>
>>
>>
>> If we were torefactor this using the GenericTypeInfo or AvroSerializer,
>> how would you suggest doing it?
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>> *Sent:* Thursday, March 02, 2017 3:07 PM
>> *To:* user@flink.apache.org; Aljoscha Krettek
>> *Subject:* Re: Serialization performance
>>
>>
>>
>> Hi!
>>
>>
>>
>> Thanks for this writeup, very cool stuff !
>>
>>
>>
>> For part (1) - serialization: I think that can be made a bit nicer. Avro
>> is a bit of an odd citizen in Flink, because Flink serialization is
>> actually schema aware, but does not integrate with Avro. That's why Avro
>> types go through Kryo.
>>
>>
>>
>> We should try and make Avro a first class citizen.
>>
>>   - The first step is to have a proper AvroSerializer. We have
>> implemented one already, see "org.apache.flink.api.java.typ
>> eutils.runtime.AvroSerializer". It works with the
>> ReflectDatumReader/Writer, but would be a good base line for all types of
>> avro-based serializers in Flink..
>>
>>
>>
>>   - Then we need to figure out how the Avro Serializer is instantiated.
>> We could just let the "GenericTypeInfo" create an Avro serializer for Avro
>> types, and Kryo for all else.
>>
>>   - The change would eventually have to be behind a config flag in the
>> ExecutionConfig (see "GenericTypeInfo.createSerializer()") to make sure
>> we do not break the default serialization format within a major release
>> version.
>>
>>
>>
>>
>>
>> A side note: If you actually use that through Beam, I am actually not
>> sure what will happen, because as far as I know, Beam  uses its completely
>> own serialization system and Flink sees only byte coders from Beam.
>> Aljoscha can probably add more detail here.
>>
>>
>>
>>
>>
>> For part (2) - the filters: If I understand correctly, you "split" the
>> data into different result sets that go to different sinks? The DataStream
>> API has a "split/select" type of construct which would help there, the
>> DataSet API does not have something like that. If you look for peak
>> performance, the demux output format seems like a good workaround on the
>> DataSet side.
>>
>>
>>
>>
>>
>> Greetings,
>>
>> Stephan
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Mar 2, 2017 at 8:01 PM, Newport, Billy <Billy.Newport@gs.com>
>> wrote:
>>
>> We’ve been working on performance for the last while. We’re using flink
>> 1.2 right now. We are writing batch jobs which process avro and parquet
>> input files and produce parquet files.
>>
>>
>>
>> Flink serialization costs seem to be the most significant aspect of our
>> wall clock time. We have written a custom kryo serializer for GenericRecord
>> which is similar to the Spark one in that it reuses avro Datum reader and
>> writers but writes a hash fingerprint for the schema instead of the schema
>> itself.
>>
>>
>>
>> We have subclassed most of the Rich* classes in flink and now also pass
>> to the constructors a Builder class which has the avro schema. When flink
>> inflates these, the builders are inflated and preregister the avro schema
>> for the hash code in a static map which the inflation/writing code uses
>> later.
>>
>>
>>
>> This is working pretty well for us, it’s like 10-20x faster than just
>> using GenericRecords normally. The question is this: Is this the right way
>> to do this? If it is then we can contribute it and then how to modify beam
>> so that it uses this stuff under the covers, we can’t use beam at all right
>> now as far as I can tell because of the performance issues with
>> GenericRecord.
>>
>>
>>
>> The other performance optimization is basically removing filters which
>> again seem to double wall clock time. We wrote an embedded demux
>> outputformat which receives a Tuple<Enum,GenericRecord> and writes to a
>> different parquet file depending on Enum. This was 2x faster than a naïve 4
>> filters going to 4 parquet outputformats.
>>
>>
>>
>> Do these optimizations seem unnecessary to some? Is there some trick
>> we’re missing?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>

Mime
View raw message