flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/21] flink git commit: [FLINK-7420] [avro] Abstract all Avro interaction behind AvroUtils
Date Fri, 03 Nov 2017 17:11:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master 16b088218 -> 87bf57816


[FLINK-7420] [avro] Abstract all Avro interaction behind AvroUtils

Before, we would try and dynamicall load Avro-related classes in several
places. Now, we only reflectively instantiate the right AvroUtils and
all other operations are methods on this.

The default AvroUtils throw exceptions with a helpful message for most
operations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65e87045
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65e87045
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65e87045

Branch: refs/heads/master
Commit: 65e87045c48ce3200ea6690d945ed87b061808af
Parents: 29249b2
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Oct 30 15:02:18 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Nov 3 16:40:34 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/AvroUtils.java     | 120 +++++++++++++++++++
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  23 +---
 .../flink/api/java/typeutils/TypeExtractor.java |  33 +----
 .../typeutils/runtime/kryo/KryoSerializer.java  |   3 +-
 .../typeutils/runtime/kryo/Serializers.java     |  63 ++--------
 .../avro/utils/AvroKryoSerializerUtils.java     |  54 +++++++--
 6 files changed, 182 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
new file mode 100644
index 0000000..e19f5fb
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+
+import java.util.LinkedHashMap;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
+
+/**
+ * Utility methods for dealing with Avro types. This has a default implementation for the
case that
+ * Avro is not present on the classpath and an actual implementation in flink-avro that is
+ * dynamically loaded when present.
+ */
+public abstract class AvroUtils {
+
+	protected static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase";
+
+	protected static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record";
+
+	private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
+
+	private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array";
+
+	/**
+	 * Loads the utility class from <code>flink-avro</code> and adds Avro-specific
serializers. This
+	 * method will throw an exception if we see an Avro type but flink-avro is not in the classpath.
+	 */
+	public abstract void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type);
+
+	/**
+	 * Registers a special Serializer for GenericData.Array.
+	 */
+	public abstract void addAvroGenericDataArrayRegistration(
+			LinkedHashMap<String,
+			KryoRegistration> kryoRegistrations);
+
+	/**
+	 * Creates an {@code AvroSerializer} if flink-avro is present, otherwise throws an exception.
+	 */
+	public abstract <T> TypeSerializer<T> createAvroSerializer(Class<T> type);
+
+	/**
+	 * Creates an {@code AvroTypeInfo} if flink-avro is present, otherwise throws an exception.
+	 */
+	public abstract <T> TypeInformation<T> createAvroTypeInfo(Class<T> type);
+
+	/**
+	 * Returns either the default {@link AvroUtils} which throw an exception in cases where
Avro
+	 * would be needed or loads the specific utils for Avro from flink-avro.
+	 */
+	public static AvroUtils getAvroUtils() {
+
+		// try and load the special AvroUtils from the flink-avro package
+		Class<?> clazz;
+		try {
+			clazz = Class.forName(AVRO_KRYO_UTILS, false, AvroUtils.class.getClassLoader());
+		} catch (ClassNotFoundException e) {
+			// cannot find the utils, return the default implementation
+			return new DefaultAvroUtils();
+		}
+
+		try {
+			return (AvroUtils) clazz.getConstructor().newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate " + AVRO_KRYO_UTILS + ".");
+		}
+	}
+
+	private static class DefaultAvroUtils extends AvroUtils {
+		@Override
+		public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
+			if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(
+				type,
+				AVRO_GENERIC_RECORD)) {
+				throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " +
+					"You may be missing the 'flink-avro' dependency.");
+			}
+		}
+
+		@Override
+		public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration>
kryoRegistrations) {
+			kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
+				new KryoRegistration(Serializers.DummyAvroRegisteredClass.class, (Class) Serializers.DummyAvroKryoSerializerClass.class));
+		}
+
+		@Override
+		public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
+			throw new RuntimeException("Could not load the AvroSerializer class. " +
+				"You may be missing the 'flink-avro' dependency.");
+		}
+
+		@Override
+		public <T> TypeInformation<T> createAvroTypeInfo(Class<T> type) {
+			throw new RuntimeException("Could not load the AvroTypeInfo class. " +
+				"You may be missing the 'flink-avro' dependency.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index b24f425..2e893bb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -31,9 +31,7 @@ import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -303,27 +301,12 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	@PublicEvolving
 	@SuppressWarnings("unchecked")
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-		if(config.isForceKryoEnabled()) {
+		if (config.isForceKryoEnabled()) {
 			return new KryoSerializer<>(getTypeClass(), config);
 		}
 
-		if(config.isForceAvroEnabled()) {
-			Class<?> clazz;
-			try {
-				clazz = Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer");
-			} catch (ClassNotFoundException e) {
-				throw new RuntimeException("Could not load the AvroSerializer class. " +
-					"You may be missing the 'flink-avro' dependency.");
-			}
-
-			try {
-				Constructor<?> constructor = clazz.getConstructor(Class.class);
-				return (TypeSerializer<T>) constructor.newInstance(getTypeClass());
-			} catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
-				throw new RuntimeException("Incompatible versions of the Avro classes found.");
-			} catch (InvocationTargetException e) {
-				throw new RuntimeException("Cannot create AvroSerializer.", e.getTargetException());
-			}
+		if (config.isForceAvroEnabled()) {
+			return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
 		}
 
 		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 1a9cecb..8ea2e1a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -117,8 +117,6 @@ public class TypeExtractor {
 
 	private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = "org.apache.avro.specific.SpecificRecordBase";
 
-	private static final String AVRO_TYPEINFO_CLASS = "org.apache.flink.formats.avro.typeutils.AvroTypeInfo";
-
 	private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
 
 	public static final int[] NO_INDEX = new int[] {};
@@ -1794,7 +1792,7 @@ public class TypeExtractor {
 
 		// special case for POJOs generated by Avro.
 		if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
-			return createAvroTypeInfo(clazz);
+			return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
 		}
 
 		if (Modifier.isInterface(clazz.getModifiers())) {
@@ -2175,33 +2173,4 @@ public class TypeExtractor {
 			// ignore
 		}
 	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities to handle Avro's 'SpecificRecord' type via reflection
-	// ------------------------------------------------------------------------
-
-	private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> clazz)
{
-		Class<?> typeInfoClass;
-		try {
-			typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
-		}
-		catch (ClassNotFoundException e) {
-			throw new RuntimeException("Could not load the TypeInformation for the class '"
-					+ AVRO_TYPEINFO_CLASS + "'. You may be missing the 'flink-avro' dependency.");
-		}
-
-		try {
-			Constructor<?> constr = typeInfoClass.getConstructor(Class.class);
-
-			@SuppressWarnings("unchecked")
-			TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz);
-			return typeInfo;
-		}
-		catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
-			throw new RuntimeException("Incompatible versions of the Avro classes found.");
-		}
-		catch (InvocationTargetException e) {
-			throw new RuntimeException("Cannot create AvroTypeInfo.", e.getTargetException());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 269cf35..560e5b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.AvroUtils;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
@@ -477,7 +478,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		}
 
 		// add Avro support if flink-avro is available; a dummy otherwise
-		Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations);
+		AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
 
 		return kryoRegistrations;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index de7b2fc..9659dc6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.AvroUtils;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -36,19 +36,14 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
-
-
 /**
  * Class containing utilities for the serializers of the Flink Runtime.
  *
@@ -61,14 +56,6 @@ import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSupercl
 @Internal
 public class Serializers {
 
-	private static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase";
-
-	private static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record";
-
-	private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
-
-	private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array";
-
 	public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig
config, Set<Class<?>> alreadySeen) {
 		if (typeInfo instanceof GenericTypeInfo) {
 			GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
@@ -104,9 +91,7 @@ public class Serializers {
 		else {
 			config.registerKryoType(type);
 			// add serializers for Avro type if necessary
-			if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD))
{
-				addAvroSerializers(config, type);
-			}
+			AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type);
 
 			Field[] fields = type.getDeclaredFields();
 			for (Field field : fields) {
@@ -161,43 +146,19 @@ public class Serializers {
 	}
 
 	/**
-	 * Loads the utility class from <code>flink-avro</code> and adds Avro-specific
serializers.
+	 * This is used in case we don't have Avro on the classpath. Flink versions before 1.4
+	 * always registered special Serializers for Kryo but starting with Flink 1.4 we don't have
+	 * Avro on the classpath by default anymore. We still have to retain the same registered
+	 * Serializers for backwards compatibility of savepoints.
 	 */
-	private static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
-		Class<?> clazz;
-		try {
-			clazz = Class.forName(AVRO_KRYO_UTILS, false, Serializers.class.getClassLoader());
-		}
-		catch (ClassNotFoundException e) {
-			throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " +
-				"You may be missing the 'flink-avro' dependency.");
-		}
-		try {
-			clazz.getDeclaredMethod("addAvroSerializers", ExecutionConfig.class, Class.class).invoke(null,
reg, type);
-		} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e)
{
-			throw new RuntimeException("Could not access method in 'flink-avro' dependency.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public static void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration>
kryoRegistrations) {
-		try {
-			Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, false, Serializers.class.getClassLoader());
-
-			kryoRegistrations.put(
-				AVRO_GENERIC_DATA_ARRAY,
-				new KryoRegistration(
-						clazz,
-						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-		}
-		catch (ClassNotFoundException e) {
-			kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
-				new KryoRegistration(DummyAvroRegisteredClass.class, (Class) DummyAvroKryoSerializerClass.class));
-		}
-	}
-
 	public static class DummyAvroRegisteredClass {}
 
+	/**
+	 * This is used in case we don't have Avro on the classpath. Flink versions before 1.4
+	 * always registered special Serializers for Kryo but starting with Flink 1.4 we don't have
+	 * Avro on the classpath by default anymore. We still have to retain the same registered
+	 * Serializers for backwards compatibility of savepoints.
+	 */
 	public static class DummyAvroKryoSerializerClass<T> extends Serializer<T> {
 		@Override
 		public void write(Kryo kryo, Output output, Object o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/65e87045/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index 7305f23..c28f6cf 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -19,7 +19,13 @@
 package org.apache.flink.formats.avro.utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -29,22 +35,50 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
 
 /**
  * Utilities for integrating Avro serializers in Kryo.
  */
-public class AvroKryoSerializerUtils {
+public class AvroKryoSerializerUtils extends AvroUtils {
+
+	@Override
+	public void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type) {
+		if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD))
{
+			// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
+			// because Kryo is not able to serialize them properly, we use this serializer for them
+			reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+
+			// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
+			// Kryo is able to serialize everything in there, except for the Schema.
+			// This serializer is very slow, but using the GenericData.Records of Kryo is in general
a bad idea.
+			// we add the serializer as a default serializer because Avro is using a private sub-type
at runtime.
+			reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
+		}
+	}
+
+	@Override
+	public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration>
kryoRegistrations) {
+		kryoRegistrations.put(
+			GenericData.Array.class.getName(),
+			new KryoRegistration(
+				GenericData.Array.class,
+				new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+	}
 
-	public static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
-		// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
-		// because Kryo is not able to serialize them properly, we use this serializer for them
-		reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+	@Override
+	public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
+		return new AvroSerializer<>(type);
+	}
 
-		// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
-		// Kryo is able to serialize everything in there, except for the Schema.
-		// This serializer is very slow, but using the GenericData.Records of Kryo is in general
a bad idea.
-		// we add the serializer as a default serializer because Avro is using a private sub-type
at runtime.
-		reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
+	@Override
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public <T> TypeInformation<T> createAvroTypeInfo(Class<T> type) {
+		// we have to be raw here because we cannot have "<T extends SpecificRecordBase>"
in
+		// the interface of AvroUtils
+		return new AvroTypeInfo(type);
 	}
 
 	/**


Mime
View raw message