flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/5] flink git commit: [FLINK-3306] [core] Fix auto-type registry util
Date Mon, 01 Feb 2016 14:57:22 GMT
Repository: flink
Updated Branches:
  refs/heads/master ef58cf302 -> 5e2fb3cb5


[FLINK-3306] [core] Fix auto-type registry util


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bc47af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bc47af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bc47af

Branch: refs/heads/master
Commit: c4bc47afa6147dd25d8b579e764b30c9c13ee8ea
Parents: d902d16
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 29 17:08:32 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 1 14:45:55 2016 +0100

----------------------------------------------------------------------
 .../api/io/avro/AvroRecordInputFormatTest.java  | 115 ++++++++---------
 .../flink/api/java/ExecutionEnvironment.java    |  51 +++-----
 .../flink/api/java/typeutils/TypeExtractor.java |   5 +
 .../typeutils/runtime/kryo/Serializers.java     | 128 +++++++++++--------
 .../kryo/KryoGenericTypeSerializerTest.java     |  31 ++---
 .../typeutils/runtime/kryo/SerializersTest.java |  61 +++++++--
 6 files changed, 217 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index c04435c..42cbebe 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -22,10 +22,7 @@ import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
@@ -219,41 +216,43 @@ public class AvroRecordInputFormatTest {
 	 */
 	@Test
 	public void testDeserializeToGenericType() throws IOException {
-		DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(userSchema);
-
-		FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile,
datumReader);
-		// initialize Record by reading it from disk (thats easier than creating it by hand)
-		GenericData.Record rec = new GenericData.Record(userSchema);
-		dataFileReader.next(rec);
-		// check if record has been read correctly
-		assertNotNull(rec);
-		assertEquals("name not equal", TEST_NAME, rec.get("name").toString() );
-		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
-		assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
-
-		// now serialize it with our framework:
-
-		TypeInformation<GenericData.Record> te = (TypeInformation<GenericData.Record>)
TypeExtractor.createTypeInfo(GenericData.Record.class);
-		ExecutionConfig ec = new ExecutionConfig();
-		Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-		Serializers.recursivelyRegisterType(( (GenericTypeInfo) te).getTypeClass(), ec);
-
-		TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
-		Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
-		Assert.assertTrue(
-			ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
-			ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
-		ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
-		tser.serialize(rec, target);
-
-		GenericData.Record newRec = tser.deserialize(target.getInputView());
-
-		// check if it is still the same
-		assertNotNull(newRec);
-		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
-		assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() );
-		assertEquals(null, newRec.get("type_long_test"));
-
+		DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
+
+		try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile,
datumReader)) {
+			// initialize Record by reading it from disk (thats easier than creating it by hand)
+			GenericData.Record rec = new GenericData.Record(userSchema);
+			dataFileReader.next(rec);
+			
+			// check if record has been read correctly
+			assertNotNull(rec);
+			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+			assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
+
+			// now serialize it with our framework:
+			TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
+
+			ExecutionConfig ec = new ExecutionConfig();
+			Assert.assertEquals(GenericTypeInfo.class, te.getClass());
+			
+			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
+
+			TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
+			Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
+			Assert.assertTrue(
+					ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+							ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
+			ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
+			tser.serialize(rec, target);
+
+			GenericData.Record newRec = tser.deserialize(target.getInputView());
+
+			// check if it is still the same
+			assertNotNull(newRec);
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
+			assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
+			assertEquals(null, newRec.get("type_long_test"));
+		}
 	}
 
 	/**
@@ -264,28 +263,30 @@ public class AvroRecordInputFormatTest {
 
 		DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
 
-		FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader);
-		User rec = dataFileReader.next();
+		try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader))
{
+			User rec = dataFileReader.next();
+
+			// check if record has been read correctly
+			assertNotNull(rec);
+			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
 
-		// check if record has been read correctly
-		assertNotNull(rec);
-		assertEquals("name not equal", TEST_NAME, rec.get("name").toString() );
-		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+			// now serialize it with our framework:
+			ExecutionConfig ec = new ExecutionConfig();
+			TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);
 
-		// now serialize it with our framework:
-		ExecutionConfig ec = new ExecutionConfig();
-		TypeInformation<User> te = (TypeInformation<User>) TypeExtractor.createTypeInfo(User.class);
-		Assert.assertEquals(AvroTypeInfo.class, te.getClass());
-		TypeSerializer<User> tser = te.createSerializer(ec);
-		ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
-		tser.serialize(rec, target);
+			Assert.assertEquals(AvroTypeInfo.class, te.getClass());
+			TypeSerializer<User> tser = te.createSerializer(ec);
+			ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
+			tser.serialize(rec, target);
 
-		User newRec = tser.deserialize(target.getInputView());
+			User newRec = tser.deserialize(target.getInputView());
 
-		// check if it is still the same
-		assertNotNull(newRec);
-		assertEquals("name not equal", TEST_NAME, newRec.getName().toString() );
-		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString()
);
+			// check if it is still the same
+			assertNotNull(newRec);
+			assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
+		}
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0dd754a..10cb5e3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -20,12 +20,7 @@ package org.apache.flink.api.java;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -38,7 +33,6 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
@@ -52,11 +46,7 @@ import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.typeutils.*;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -942,30 +932,23 @@ public abstract class ExecutionEnvironment {
 			plan.setDefaultParallelism(getParallelism());
 		}
 		plan.setExecutionConfig(getConfig());
+		
 		// Check plan for GenericTypeInfo's and register the types at the serializers.
-		plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>()
{
-			@Override
-			public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable)
{
-				OperatorInformation<?> opInfo = visitable.getOperatorInfo();
-				TypeInformation<?> typeInfo = opInfo.getOutputType();
-				if(typeInfo instanceof GenericTypeInfo) {
-					GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
-					if(!config.isAutoTypeRegistrationDisabled()) {
-						Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config);
-					}
-				}
-				if(typeInfo instanceof CompositeType) {
-					List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
-					Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
-					for(GenericTypeInfo<?> gt : genericTypesInComposite) {
-						Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
-					}
+		if (!config.isAutoTypeRegistrationDisabled()) {
+			plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>()
{
+				
+				private final HashSet<Class<?>> deduplicator = new HashSet<>();
+				
+				@Override
+				public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable)
{
+					OperatorInformation<?> opInfo = visitable.getOperatorInfo();
+					Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator);
+					return true;
 				}
-				return true;
-			}
-			@Override
-			public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable)
{}
-		});
+				@Override
+				public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable)
{}
+			});
+		}
 
 		try {
 			registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index ddb4a48..b8f2075 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -332,6 +332,11 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	//  Create type information
 	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
+		return (TypeInformation<T>) createTypeInfo((Type) type);
+	}
 	
 	public static TypeInformation<?> createTypeInfo(Type t) {
 		TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 0ea8691..6903d35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -23,24 +23,28 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+
 import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
 import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecordBase;
+
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
+import java.lang.reflect.*;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 
@@ -54,49 +58,78 @@ import java.util.Set;
  * Also, there is a Java Annotation for adding a default serializer (@DefaultSerializer)
to classes.
  */
 public class Serializers {
-	/**
-	 * NOTE: This method is not a public Flink API.
-	 *
-	 * This method walks the entire hierarchy of the given type and registers all types it encounters
-	 * to Kryo.
-	 * It also watches for types which need special serializers.
-	 */
-	private static Set<Class<?>> alreadySeen = new HashSet<>();
 
-	public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config)
{
-		alreadySeen.add(type);
+	public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig
config, Set<Class<?>> alreadySeen) {
+		if (typeInfo instanceof GenericTypeInfo) {
+			GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
+			Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config, alreadySeen);
+		}
+		else if (typeInfo instanceof CompositeType) {
+			List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
+			Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
+			for (GenericTypeInfo<?> gt : genericTypesInComposite) {
+				Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen);
+			}
+		}
+		else if (typeInfo instanceof ObjectArrayTypeInfo) {
+			ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = (ObjectArrayTypeInfo<?, ?>)
typeInfo;
+			recursivelyRegisterType(objectArrayTypeInfo.getComponentInfo(), config, alreadySeen);
+		}
+	}
+	
+	public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config,
Set<Class<?>> alreadySeen) {
+		// don't register or remember primitives
+		if (type == null || type.isPrimitive() || type == Object.class) {
+			return;
+		}
 		
-		if (type.isPrimitive()) {
+		// prevent infinite recursion for recursive types
+		if (!alreadySeen.add(type)) {
 			return;
 		}
-		config.registerKryoType(type);
-		addSerializerForType(config, type);
-
-		Field[] fields = type.getDeclaredFields();
-		for (Field field : fields) {
-			if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers()))
{
-				continue;
-			}
-			Type fieldType = field.getGenericType();
-			if (fieldType instanceof ParameterizedType) { // field has generics
-				ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType;
-				for (Type t: parameterizedFieldType.getActualTypeArguments()) {
-					if (TypeExtractor.isClassType(t) ) {
-						Class<?> clazz = TypeExtractor.typeToClass(t);
-						if (!alreadySeen.contains(clazz)) {
-							recursivelyRegisterType(TypeExtractor.typeToClass(t), config);
-						}
-					}
+		
+		if (type.isArray()) {
+			recursivelyRegisterType(type.getComponentType(), config, alreadySeen);
+		}
+		else {
+			config.registerKryoType(type);
+			checkAndAddSerializerForTypeAvro(config, type);
+	
+			Field[] fields = type.getDeclaredFields();
+			for (Field field : fields) {
+				if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers()))
{
+					continue;
 				}
+				Type fieldType = field.getGenericType();
+				recursivelyRegisterGenericType(fieldType, config, alreadySeen);
 			}
-			Class<?> clazz = field.getType();
-			if (!alreadySeen.contains(clazz)) {
-				recursivelyRegisterType(clazz, config);
+		}
+	}
+	
+	private static void recursivelyRegisterGenericType(Type fieldType, ExecutionConfig config,
Set<Class<?>> alreadySeen) {
+		if (fieldType instanceof ParameterizedType) {
+			// field has generics
+			ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType;
+			
+			for (Type t: parameterizedFieldType.getActualTypeArguments()) {
+				if (TypeExtractor.isClassType(t) ) {
+					recursivelyRegisterType(TypeExtractor.typeToClass(t), config, alreadySeen);
+				}
 			}
+
+			recursivelyRegisterGenericType(parameterizedFieldType.getRawType(), config, alreadySeen);
+		}
+		else if (fieldType instanceof GenericArrayType) {
+			GenericArrayType genericArrayType = (GenericArrayType) fieldType;
+			recursivelyRegisterGenericType(genericArrayType.getGenericComponentType(), config, alreadySeen);
+		}
+		else if (fieldType instanceof Class) {
+			Class<?> clazz = (Class<?>) fieldType;
+			recursivelyRegisterType(clazz, config, alreadySeen);
 		}
 	}
 
-	public static void addSerializerForType(ExecutionConfig reg, Class<?> type) {
+	private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?>
type) {
 		if (GenericData.Record.class.isAssignableFrom(type)) {
 			registerGenericAvro(reg);
 		}
@@ -105,16 +138,13 @@ public class Serializers {
 			Class<? extends SpecificRecordBase> specRecordClass = (Class<? extends SpecificRecordBase>)
type;
 			registerSpecificAvro(reg, specRecordClass);
 		}
-		if (DateTime.class.isAssignableFrom(type) || Interval.class.isAssignableFrom(type)) {
-			registerJodaTime(reg);
-		}
 	}
 
 	/**
 	 * Register these serializers for using Avro's {@link GenericData.Record} and classes
 	 * implementing {@link org.apache.avro.specific.SpecificRecordBase}
 	 */
-	public static void registerGenericAvro(ExecutionConfig reg) {
+	private static void registerGenericAvro(ExecutionConfig reg) {
 		// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
 		// because Kryo is not able to serialize them properly, we use this serializer for them
 		reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);
@@ -126,8 +156,7 @@ public class Serializers {
 		reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
 	}
 
-
-	public static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase>
avroType) {
+	private static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase>
avroType) {
 		registerGenericAvro(reg);
 		// This rule only applies if users explicitly use the GenericTypeInformation for the avro
types
 		// usually, we are able to handle Avro POJOs with the POJO serializer.
@@ -136,14 +165,13 @@ public class Serializers {
 	//	ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType);
 	//	reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
 	}
-
-
+	
 	/**
 	 * Currently, the following classes of JodaTime are supported:
-	 * 	- DateTime
-	 * 	- Interval
+	 *      - DateTime
+	 *      - Interval
 	 *
-	 * 	The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer})
+	 *      The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer})
 	 * <ul>
 	 * <li>{@link org.joda.time.chrono.ISOChronology}</li>
 	 * <li>{@link org.joda.time.chrono.CopticChronology}</li>
@@ -159,7 +187,7 @@ public class Serializers {
 		reg.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
 		reg.registerTypeWithKryoSerializer(Interval.class, JodaIntervalSerializer.class);
 	}
-
+	
 	/**
 	 * Register less frequently used serializers
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
index 4b6e432..8ff0b1b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
@@ -18,23 +18,24 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import static org.junit.Assert.*;
-
 import com.esotericsoftware.kryo.Kryo;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
 import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
-import org.joda.time.DateTime;
-import org.junit.Test;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import org.junit.Test;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Random;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings("unchecked")
 public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
 
@@ -42,7 +43,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 	
 	@Test
 	public void testJavaList(){
-		Collection<Integer> a = new ArrayList<Integer>();
+		Collection<Integer> a = new ArrayList<>();
 
 		fillCollection(a);
 
@@ -51,7 +52,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 
 	@Test
 	public void testJavaSet(){
-		Collection<Integer> b = new HashSet<Integer>();
+		Collection<Integer> b = new HashSet<>();
 
 		fillCollection(b);
 
@@ -62,24 +63,12 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 
 	@Test
 	public void testJavaDequeue(){
-		Collection<Integer> c = new LinkedList<Integer>();
-
+		Collection<Integer> c = new LinkedList<>();
 		fillCollection(c);
-
 		runTests(c);
 	}
 
-	@Test
-	public void testJodaTime(){
-		Collection<DateTime> b = new HashSet<DateTime>();
-		Serializers.registerJodaTime(ec);
-		b.add(new DateTime(1));
-		b.add(new DateTime(2));
-
-		runTests(b);
-	}
-
-	private void fillCollection(Collection<Integer> coll){
+	private void fillCollection(Collection<Integer> coll) {
 		coll.add(42);
 		coll.add(1337);
 		coll.add(49);
@@ -140,7 +129,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 			int numElements = 100;
 			// construct a memory target that is too small for the string
 			TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements);
-			KryoSerializer<Integer> serializer = new KryoSerializer<Integer>(Integer.class,
new ExecutionConfig());
+			KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new
ExecutionConfig());
 
 			for(int i = 0; i < numElements; i++){
 				serializer.serialize(i, target);

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
index 96d761a..7c6d023 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
@@ -15,15 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.joda.time.Interval;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.fs.Path;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertTrue;
 
 public class SerializersTest {
 
@@ -31,36 +36,68 @@ public class SerializersTest {
 	public static class Node {
 		private Node parent;
 	}
+	
 	public static class FromNested {
 		Node recurseMe;
 	}
-	public static class FromGeneric {
-
-	}
+	
+	public static class FromGeneric1 {}
+	public static class FromGeneric2 {}
+	
 	public static class Nested1 {
 		private FromNested fromNested;
-		private Interval yodaIntervall;
+		private Path yodaIntervall;
 	}
 
 	public static class ClassWithNested {
+		
 		Nested1 nested;
 		int ab;
-		ArrayList<FromGeneric> addGenType;
+		
+		ArrayList<FromGeneric1> addGenType;
+		FromGeneric2[] genericArrayType;
 	}
 
 	@Test
 	public void testTypeRegistration() {
 		ExecutionConfig conf = new ExecutionConfig();
-		Serializers.recursivelyRegisterType(ClassWithNested.class, conf);
-		KryoSerializer kryo = new KryoSerializer(String.class, conf); // we create Kryo from another
type.
+		Serializers.recursivelyRegisterType(ClassWithNested.class, conf, new HashSet<Class<?>>());
+		
+		KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); //
we create Kryo from another type.
 
 		Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
 		Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
-		Assert.assertTrue(kryo.getKryo().getRegistration(Interval.class).getId() > 0);
-		Assert.assertTrue(kryo.getKryo().getRegistration(Interval.class).getSerializer().getClass()
== JodaIntervalSerializer.class);
+		Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
+		
 		// check if the generic type from one field is also registered (its very likely that
 		// generic types are also used as fields somewhere.
-		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric.class).getId() > 0);
+		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
+		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
 		Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
+		
+		
+		// register again and make sure classes are still registered
+		ExecutionConfig conf2 = new ExecutionConfig();
+		Serializers.recursivelyRegisterType(ClassWithNested.class, conf2, new HashSet<Class<?>>());
+		KryoSerializer<String> kryo2 = new KryoSerializer<>(String.class, conf);
+		assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0);
+	}
+
+	@Test
+	public void testTypeRegistrationFromTypeInfo() {
+		ExecutionConfig conf = new ExecutionConfig();
+		Serializers.recursivelyRegisterType(new GenericTypeInfo<>(ClassWithNested.class),
conf, new HashSet<Class<?>>());
+
+		KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); //
we create Kryo from another type.
+
+		assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
+
+		// check if the generic type from one field is also registered (its very likely that
+		// generic types are also used as fields somewhere.
+		assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
 	}
 }


Mime
View raw message