flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
Date Sat, 04 Nov 2017 07:41:03 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238832#comment-16238832
] 

ASF GitHub Bot commented on FLINK-6022:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4943#discussion_r148924463
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
---
    @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException
{
     		return this.reader.read(reuse, this.decoder);
     	}
     
    +	// ------------------------------------------------------------------------
    +	//  Copying
    +	// ------------------------------------------------------------------------
    +
     	@Override
    -	public void copy(DataInputView source, DataOutputView target) throws IOException {
    +	public T copy(T from) {
     		checkAvroInitialized();
    +		return avroData.deepCopy(schema, from);
    +	}
     
    -		if (this.deepCopyInstance == null) {
    -			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
    -		}
    -
    -		this.decoder.setIn(source);
    -		this.encoder.setOut(target);
    +	@Override
    +	public T copy(T from, T reuse) {
    +		return copy(from);
    +	}
     
    -		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
    -		this.writer.write(tmp, this.encoder);
    +	@Override
    +	public void copy(DataInputView source, DataOutputView target) throws IOException {
    +		T value = deserialize(source);
    +		serialize(value, target);
     	}
     
    -	private void checkAvroInitialized() {
    -		if (this.reader == null) {
    -			this.reader = new ReflectDatumReader<T>(type);
    -			this.writer = new ReflectDatumWriter<T>(type);
    -			this.encoder = new DataOutputEncoder();
    -			this.decoder = new DataInputDecoder();
    +	// ------------------------------------------------------------------------
    +	//  Compatibility and Upgrades
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +		if (configSnapshot == null) {
    +			checkAvroInitialized();
    +			configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false));
     		}
    +		return configSnapshot;
     	}
     
    -	private void checkKryoInitialized() {
    -		if (this.kryo == null) {
    -			this.kryo = new Kryo();
    -
    -			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
    -			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
    -			kryo.setInstantiatorStrategy(instantiatorStrategy);
    +	@Override
    +	@SuppressWarnings("deprecation")
    +	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot
configSnapshot) {
    +		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) {
    +			// proper schema snapshot, can do the sophisticated schema-based compatibility check
    +			final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
    +			final Schema lastSchema = new Schema.Parser().parse(schemaString);
     
    -			kryo.setAsmEnabled(true);
    +			final SchemaPairCompatibility compatibility =
    +					SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
     
    -			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
    +			return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ?
    +					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
    +		}
    +		else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
    +			// old snapshot case, just compare the type
    +			// we don't need to restore any Kryo stuff, since Kryo was never used for persistence,
    +			// only for object-to-object copies.
    +			final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot;
    +			return type.equals(old.getTypeClass()) ?
    +					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
    +		}
    +		else {
    +			return CompatibilityResult.requiresMigration();
     		}
     	}
     
    -	// --------------------------------------------------------------------------------------------
    +	// ------------------------------------------------------------------------
    +	//  Utilities
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public TypeSerializer<T> duplicate() {
    +		return new AvroSerializer<>(type);
    +	}
     
     	@Override
     	public int hashCode() {
    -		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
    +		return 42 + type.hashCode();
     	}
     
     	@Override
     	public boolean equals(Object obj) {
    -		if (obj instanceof AvroSerializer) {
    -			@SuppressWarnings("unchecked")
    -			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
    -
    -			return avroSerializer.canEqual(this) &&
    -				type == avroSerializer.type &&
    -				typeToInstantiate == avroSerializer.typeToInstantiate;
    -		} else {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj != null && obj.getClass() == AvroSerializer.class) {
    +			final AvroSerializer that = (AvroSerializer) obj;
    +			return this.type == that.type;
    +		}
    +		else {
     			return false;
     		}
     	}
     
     	@Override
     	public boolean canEqual(Object obj) {
    -		return obj instanceof AvroSerializer;
    +		return obj.getClass() == this.getClass();
     	}
     
    -	// --------------------------------------------------------------------------------------------
    -	// Serializer configuration snapshotting & compatibility
    -	// --------------------------------------------------------------------------------------------
    -
     	@Override
    -	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
    -		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
    +	public String toString() {
    +		return getClass().getName() + " (" + getType().getName() + ')';
     	}
     
    -	@SuppressWarnings("unchecked")
    -	@Override
    -	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot
configSnapshot) {
    -		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
    -			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>)
configSnapshot;
    +	// ------------------------------------------------------------------------
    +	//  Initialization
    +	// ------------------------------------------------------------------------
    +
    +	private void checkAvroInitialized() {
    +		if (writer == null) {
    +			initializeAvro();
    +		}
    +	}
    +
    +	private void initializeAvro() {
    +		final ClassLoader cl = Thread.currentThread().getContextClassLoader();
    +
    +		if (SpecificRecord.class.isAssignableFrom(type)) {
    +			this.avroData = new SpecificData(cl);
    +			this.schema = this.avroData.getSchema(type);
    +			this.reader = new SpecificDatumReader<>(schema, schema, avroData);
    +			this.writer = new SpecificDatumWriter<>(schema, avroData);
    +		}
    +		else {
    +			final ReflectData reflectData = new ReflectData(cl);
    +			this.avroData = reflectData;
    +			this.schema = this.avroData.getSchema(type);
    +			this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
    +			this.writer = new ReflectDatumWriter<>(schema, reflectData);
    +		}
    +
    +		this.encoder = new DataOutputEncoder();
    +		this.decoder = new DataInputDecoder();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Serializer Snapshots
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility.
    +	 */
    +	public static final class AvroSchemaSerializerConfigSnapshot extends TypeSerializerConfigSnapshot
{
    +
    +		private String schemaString;
    +
    +		/**
    +		 * Default constructor for instantiation via reflection.
    +		 */
    +		@SuppressWarnings("unused")
    +		public AvroSchemaSerializerConfigSnapshot() {}
    +
    +		public AvroSchemaSerializerConfigSnapshot(String schemaString) {
    +			this.schemaString = checkNotNull(schemaString);
    +		}
    +
    +		public String getSchemaString() {
    +			return schemaString;
    +		}
    +
    +		// --- Serialization ---
    +
    +		@Override
    +		public void read(DataInputView in) throws IOException {
    +			super.read(in);
    +			this.schemaString = in.readUTF();
    +		}
    +
    +		@Override
    +		public void write(DataOutputView out) throws IOException {
    +			super.write(out);
    +			out.writeUTF(schemaString);
    +		}
     
    -			if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate()))
{
    -				// resolve Kryo registrations; currently, since the Kryo registrations in Avro
    -				// are fixed, there shouldn't be a problem with the resolution here.
    +		// --- Version ---
     
    -				LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
    -				oldRegistrations.putAll(kryoRegistrations);
    +		@Override
    +		public int getVersion() {
    +			return 1;
    +		}
     
    -				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet())
{
    -					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
    -						return CompatibilityResult.requiresMigration();
    -					}
    -				}
    +		// --- Utils ---
     
    -				this.kryoRegistrations = oldRegistrations;
    -				return CompatibilityResult.compatible();
    +		@Override
    +		public boolean equals(Object obj) {
    +			if (obj == this) {
    +				return true;
    +			}
    +			else if (obj != null && obj.getClass() == AvroSchemaSerializerConfigSnapshot.class)
{
    +				final AvroSchemaSerializerConfigSnapshot that = (AvroSchemaSerializerConfigSnapshot)
obj;
    +				return this.schemaString.equals(that.schemaString);
    --- End diff --
    
    Is the schema string guaranteed to be stable or can it happen that two different Avro
versions generate schema strings that Avro thinks are compatible but are slightly different
strings?


> Improve support for Avro GenericRecord
> --------------------------------------
>
>                 Key: FLINK-6022
>                 URL: https://issues.apache.org/jira/browse/FLINK-6022
>             Project: Flink
>          Issue Type: Improvement
>          Components: Type Serialization System
>            Reporter: Robert Metzger
>            Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by shipping
the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but the performance
will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message