flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-3602] Fix TypeExtractor and add support for recursive types
Date Tue, 22 Mar 2016 13:44:55 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 954cdc113 -> cd5773ed3


[FLINK-3602] Fix TypeExtractor and add support for recursive types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0880c594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0880c594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0880c594

Branch: refs/heads/release-1.0
Commit: 0880c59433b2ec578265bf51b1e81a5783935cd5
Parents: 954cdc1
Author: twalthr <twalthr@apache.org>
Authored: Mon Mar 14 13:36:51 2016 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Mar 22 13:58:01 2016 +0100

----------------------------------------------------------------------
 .../io/AvroInputFormatTypeExtractionTest.java   |  2 ++
 .../flink/api/java/typeutils/AvroTypeInfo.java  |  6 +++--
 .../flink/api/java/typeutils/TypeExtractor.java | 24 +++++++++++---------
 3 files changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0880c594/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
index 23fbab3..e245026 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -58,6 +58,8 @@ public class AvroInputFormatTypeExtractionTest {
 
 		public String theString;
 
+		public MyAvroType recursive;
+
 		private double aDouble;
 
 		public double getaDouble() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0880c594/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index 0132eff..1356e53 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -50,13 +50,15 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends
PojoTypeInfo<T>
 
 	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T>
typeClass) {
 		PojoTypeExtractor pte = new PojoTypeExtractor();
-		TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null,
null);
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
+		typeHierarchy.add(typeClass);
+		TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
 
 		if(!(ti instanceof PojoTypeInfo)) {
 			throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
 		}
 		PojoTypeInfo pti =  (PojoTypeInfo) ti;
-		List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields());
+		List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
 
 		for(int i = 0; i < pti.getArity(); i++) {
 			PojoField f = pti.getPojoFieldAt(i);

http://git-wip-us.apache.org/repos/asf/flink/blob/0880c594/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index dd4b132..fdebffd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1322,11 +1322,22 @@ public class TypeExtractor {
 	private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT>
clazz, ArrayList<Type> typeHierarchy,
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2>
in2Type) {
 		Preconditions.checkNotNull(clazz);
-		
+
+		// Object is handled as generic type info
 		if (clazz.equals(Object.class)) {
+			return new GenericTypeInfo<>(clazz);
+		}
+
+		// Class is handled as generic type info
+		if (clazz.equals(Class.class)) {
 			return new GenericTypeInfo<OUT>(clazz);
 		}
-		
+
+		// recursive types are handled as generic type info
+		if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
+			return new GenericTypeInfo<>(clazz);
+		}
+
 		// check for arrays
 		if (clazz.isArray()) {
 
@@ -1394,20 +1405,11 @@ public class TypeExtractor {
 			return new AvroTypeInfo(clazz);
 		}
 
-		if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
-			return new GenericTypeInfo<OUT>(clazz);
-		}
-
 		if (Modifier.isInterface(clazz.getModifiers())) {
 			// Interface has no members and is therefore not handled as POJO
 			return new GenericTypeInfo<OUT>(clazz);
 		}
 
-		if (clazz.equals(Class.class)) {
-			// special case handling for Class, this should not be handled by the POJO logic
-			return new GenericTypeInfo<OUT>(clazz);
-		}
-
 		try {
 			TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy),
parameterizedType, in1Type, in2Type);
 			if (pojoType != null) {


Mime
View raw message