flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-1458] Allow Interfaces and abstract types in TypeExtractor
Date Thu, 05 Feb 2015 11:03:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 71d05d744 -> 91f9bfc78


[FLINK-1458] Allow Interfaces and abstract types in TypeExtractor

Kryo already supports them, so it was just a question of the
TypeExtractor allowing them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f9bfc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f9bfc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f9bfc7

Branch: refs/heads/master
Commit: 91f9bfc782cc190738bfd3ad822348728a053b46
Parents: 71d05d7
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Feb 2 16:08:18 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Feb 5 12:00:59 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 36 +++++++++-----
 .../java/type/extractor/TypeExtractorTest.java  |  5 +-
 .../runtime/KryoGenericTypeSerializerTest.scala | 52 ++++++++++++++++++++
 .../ScalaSpecialTypesSerializerTest.scala       |  5 +-
 .../runtime/TraversableSerializerTest.scala     |  7 ++-
 5 files changed, 87 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 99292a6..124055c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -893,6 +893,10 @@ public class TypeExtractor {
 		while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
 			typeHierarchy.add(curT);
 			curT = typeToClass(curT).getGenericSuperclass();
+
+			if (curT == null) {
+				break;
+			}
 		}
 		return curT;
 	}
@@ -1090,11 +1094,6 @@ public class TypeExtractor {
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2>
in2Type) {
 		Validate.notNull(clazz);
 		
-		// check for abstract classes or interfaces
-		if (!clazz.isPrimitive() && (Modifier.isInterface(clazz.getModifiers()) || (Modifier.isAbstract(clazz.getModifiers())
&& !clazz.isArray()))) {
-			throw new InvalidTypesException("Interfaces and abstract classes are not valid types:
" + clazz);
-		}
-
 		if (clazz.equals(Object.class)) {
 			return new GenericTypeInfo<OUT>(clazz);
 		}
@@ -1153,6 +1152,11 @@ public class TypeExtractor {
 
 		alreadySeen.add(clazz);
 
+		if (Modifier.isInterface(clazz.getModifiers())) {
+			// Interface has no members and is therefore not handled as POJO
+			return new GenericTypeInfo<OUT>(clazz);
+		}
+
 		if (clazz.equals(Class.class)) {
 			// special case handling for Class, this should not be handled by the POJO logic
 			return new GenericTypeInfo<OUT>(clazz);
@@ -1228,10 +1232,10 @@ public class TypeExtractor {
 				return true;
 			} else {
 				if(!hasGetter) {
-					LOG.warn("Class "+clazz+" does not contain a getter for field "+f.getName() );
+					LOG.debug("Class "+clazz+" does not contain a getter for field "+f.getName() );
 				}
 				if(!hasSetter) {
-					LOG.warn("Class "+clazz+" does not contain a setter for field "+f.getName() );
+					LOG.debug("Class "+clazz+" does not contain a setter for field "+f.getName() );
 				}
 				return false;
 			}
@@ -1251,11 +1255,16 @@ public class TypeExtractor {
 		}
 		
 		List<Field> fields = getAllDeclaredFields(clazz);
+		if(fields.size() == 0) {
+			LOG.info("No fields detected for class " + clazz + ". Cannot be used as a PojoType. Will
be handled as GenericType");
+			return new GenericTypeInfo<OUT>(clazz);
+		}
+
 		List<PojoField> pojoFields = new ArrayList<PojoField>();
 		for (Field field : fields) {
 			Type fieldType = field.getGenericType();
 			if(!isValidPojoField(field, clazz, typeHierarchy)) {
-				LOG.warn("Class "+clazz+" is not a valid POJO type");
+				LOG.info("Class " + clazz + " is not a valid POJO type");
 				return null;
 			}
 			try {
@@ -1281,7 +1290,7 @@ public class TypeExtractor {
 		List<Method> methods = getAllDeclaredMethods(clazz);
 		for (Method method : methods) {
 			if (method.getName().equals("readObject") || method.getName().equals("writeObject")) {
-				LOG.warn("Class "+clazz+" contains custom serialization methods we do not call.");
+				LOG.info("Class "+clazz+" contains custom serialization methods we do not call.");
 				return null;
 			}
 		}
@@ -1291,8 +1300,13 @@ public class TypeExtractor {
 		try {
 			clazz.getDeclaredConstructor();
 		} catch (NoSuchMethodException e) {
-			LOG.warn("Class " + clazz + " must have a default constructor to be used as a POJO.");
-			return null;
+			if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
+				LOG.info("Class " + clazz + " is abstract or an interface, having a concrete " +
+						"type can increase performance.");
+			} else {
+				LOG.info("Class " + clazz + " must have a default constructor to be used as a POJO.");
+				return null;
+			}
 		}
 		
 		// everything is checked, we return the pojo

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index f234ed5..748f81c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -1090,7 +1091,7 @@ public class TypeExtractorTest {
 		};
 		
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO,
null, true);
-		Assert.assertTrue(ti instanceof MissingTypeInfo);
+		Assert.assertTrue(ti instanceof GenericTypeInfo);
 
 		RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>()
{
 			private static final long serialVersionUID = 1L;
@@ -1102,7 +1103,7 @@ public class TypeExtractorTest {
 		};
 
 		TypeInformation<?> ti2 = TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO,
null, true);
-		Assert.assertTrue(ti2 instanceof MissingTypeInfo);
+		Assert.assertTrue(ti2 instanceof GenericTypeInfo);
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index d61edff..c396f9f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -32,6 +32,58 @@ import com.esotericsoftware.kryo.io.Input
 class KryoGenericTypeSerializerTest {
 
   @Test
+  def testTraitSerialization(): Unit = {
+    trait SimpleTrait {
+      def contains(x: String): Boolean
+    }
+    class SimpleClass1 extends SimpleTrait {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass1 => true
+        case _ => false
+      }
+    }
+    class SimpleClass2 extends SimpleTrait {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass2 => true
+        case _ => false
+      }
+    }
+
+    val testData = Array(new SimpleClass1, new SimpleClass1, new SimpleClass2)
+    runTests(testData)
+  }
+
+  @Test
+  def testAbstractSerialization(): Unit = {
+    abstract class SimpleAbstractBase {
+      def contains(x: String): Boolean
+    }
+    class SimpleClass1 extends SimpleAbstractBase {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass1 => true
+        case _ => false
+      }
+    }
+    class SimpleClass2 extends SimpleAbstractBase {
+      def contains(x: String) = true
+
+      override def equals(other: Any): Boolean = other match {
+        case other: SimpleClass2 => true
+        case _ => false
+      }
+    }
+
+    val testData = Array(new SimpleClass1, new SimpleClass1, new SimpleClass2)
+    runTests(testData)
+  }
+
+  @Test
   def testThrowableSerialization: Unit = {
     val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index b8f79fd..fc51c0c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.junit.Assert._
 
 import org.apache.flink.api.common.typeutils.{TypeSerializer, SerializerTestInstance}
@@ -116,7 +117,9 @@ class ScalaSpecialTypesSerializerTestInstance[T](
     try {
       val serializer: TypeSerializer[T] = getSerializer
       val instance: T = serializer.createInstance
-      assertNotNull("The created instance must not be null.", instance)
+      if (!serializer.isInstanceOf[KryoSerializer[_]]) {
+        assertNotNull("The created instance must not be null.", instance)
+      }
       val tpe: Class[T] = getTypeClass
       assertNotNull("The test is corrupt: type class is null.", tpe)
       // We cannot check this because Collection Instances are not always of the type

http://git-wip-us.apache.org/repos/asf/flink/blob/91f9bfc7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 587bbf3..84ff4a6 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -55,7 +55,7 @@ class TraversableSerializerTest {
     runTests(testData)
   }
 
-  @Test(expected = classOf[InvalidTypesException])
+  @Test
   def testSortedMap(): Unit = {
     // SortedSet is not supported right now.
     val testData = Array(SortedMap("Hello" -> 1, "World" -> 2), SortedMap("Foo" ->
42))
@@ -68,7 +68,7 @@ class TraversableSerializerTest {
     runTests(testData)
   }
 
-  @Test(expected = classOf[InvalidTypesException])
+  @Test
   def testSortedSet(): Unit = {
     // SortedSet is not supported right now.
     val testData = Array(SortedSet(1,2,3), SortedSet(2,3))
@@ -118,10 +118,9 @@ class TraversableSerializerTest {
   }
 
   @Test
-  @Ignore
   def testWithMixedPrimitives(): Unit = {
     // Does not work yet because the GenericTypeInfo used for the elements will
-    // have a typeClass of Object, and therefore not deserializer the elements correctly.
+    // have a typeClass of Object, and therefore not deserialize the elements correctly.
     // It does work when used in a Job, though. Because the Objects get cast to
     // the correct type in the user function.
     val testData = Array(Seq(1,1L,1d,true,"Hello"), Seq(2,2L,2d,false,"Ciao"))


Mime
View raw message