flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
Date Sat, 04 Nov 2017 07:40:57 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4943#discussion_r148924447
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
---
    @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException
{
     		return this.reader.read(reuse, this.decoder);
     	}
     
    +	// ------------------------------------------------------------------------
    +	//  Copying
    +	// ------------------------------------------------------------------------
    +
     	@Override
    -	public void copy(DataInputView source, DataOutputView target) throws IOException {
    +	public T copy(T from) {
     		checkAvroInitialized();
    +		return avroData.deepCopy(schema, from);
    +	}
     
    -		if (this.deepCopyInstance == null) {
    -			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
    -		}
    -
    -		this.decoder.setIn(source);
    -		this.encoder.setOut(target);
    +	@Override
    +	public T copy(T from, T reuse) {
    +		return copy(from);
    +	}
     
    -		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
    -		this.writer.write(tmp, this.encoder);
    +	@Override
    +	public void copy(DataInputView source, DataOutputView target) throws IOException {
    +		T value = deserialize(source);
    +		serialize(value, target);
     	}
     
    -	private void checkAvroInitialized() {
    -		if (this.reader == null) {
    -			this.reader = new ReflectDatumReader<T>(type);
    -			this.writer = new ReflectDatumWriter<T>(type);
    -			this.encoder = new DataOutputEncoder();
    -			this.decoder = new DataInputDecoder();
    +	// ------------------------------------------------------------------------
    +	//  Compatibility and Upgrades
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +		if (configSnapshot == null) {
    +			checkAvroInitialized();
    +			configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false));
     		}
    +		return configSnapshot;
     	}
     
    -	private void checkKryoInitialized() {
    -		if (this.kryo == null) {
    -			this.kryo = new Kryo();
    -
    -			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
    -			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
    -			kryo.setInstantiatorStrategy(instantiatorStrategy);
    +	@Override
    +	@SuppressWarnings("deprecation")
    +	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot
configSnapshot) {
    +		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) {
    +			// proper schema snapshot, can do the sophisticated schema-based compatibility check
    +			final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
    +			final Schema lastSchema = new Schema.Parser().parse(schemaString);
     
    -			kryo.setAsmEnabled(true);
    +			final SchemaPairCompatibility compatibility =
    +					SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
     
    -			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
    +			return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ?
    +					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
    +		}
    +		else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
    +			// old snapshot case, just compare the type
    +			// we don't need to restore any Kryo stuff, since Kryo was never used for persistence,
    +			// only for object-to-object copies.
    +			final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot;
    +			return type.equals(old.getTypeClass()) ?
    +					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
    +		}
    +		else {
    +			return CompatibilityResult.requiresMigration();
     		}
     	}
     
    -	// --------------------------------------------------------------------------------------------
    +	// ------------------------------------------------------------------------
    +	//  Utilities
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public TypeSerializer<T> duplicate() {
    +		return new AvroSerializer<>(type);
    +	}
     
     	@Override
     	public int hashCode() {
    -		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
    +		return 42 + type.hashCode();
     	}
     
     	@Override
     	public boolean equals(Object obj) {
    -		if (obj instanceof AvroSerializer) {
    -			@SuppressWarnings("unchecked")
    -			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
    -
    -			return avroSerializer.canEqual(this) &&
    -				type == avroSerializer.type &&
    -				typeToInstantiate == avroSerializer.typeToInstantiate;
    -		} else {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj != null && obj.getClass() == AvroSerializer.class) {
    +			final AvroSerializer that = (AvroSerializer) obj;
    +			return this.type == that.type;
    +		}
    +		else {
     			return false;
     		}
     	}
     
     	@Override
     	public boolean canEqual(Object obj) {
    -		return obj instanceof AvroSerializer;
    +		return obj.getClass() == this.getClass();
     	}
     
    -	// --------------------------------------------------------------------------------------------
    -	// Serializer configuration snapshotting & compatibility
    -	// --------------------------------------------------------------------------------------------
    -
     	@Override
    -	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
    -		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
    +	public String toString() {
    +		return getClass().getName() + " (" + getType().getName() + ')';
     	}
     
    -	@SuppressWarnings("unchecked")
    -	@Override
    -	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot
configSnapshot) {
    -		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
    -			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>)
configSnapshot;
    +	// ------------------------------------------------------------------------
    +	//  Initialization
    +	// ------------------------------------------------------------------------
    +
    +	private void checkAvroInitialized() {
    +		if (writer == null) {
    +			initializeAvro();
    +		}
    +	}
    +
    +	private void initializeAvro() {
    +		final ClassLoader cl = Thread.currentThread().getContextClassLoader();
    --- End diff --
    
    Could this be `type.getClassLoader()`? Assuming that the context classloader is always
the user-code classloader is easy to break by accident.


---

Mime
View raw message