flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [17/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:17 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
new file mode 100644
index 0000000..bc11848
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultiset;
+
+/**
+ *  Pojo Type tests
+ *
+ *  A Pojo is a bean-style class with getters, setters and empty ctor
+ *   OR a class with all fields public (or for every private field, there has to be a public getter/setter)
+ *   everything else is a generic type (that can't be used for field selection)
+ */
+public class PojoTypeExtractionTest {
+
+	public static class HasDuplicateField extends WC {
+		@SuppressWarnings("unused")
+		private int count; // duplicate
+	}
+
+	@Test(expected=RuntimeException.class)
+	public void testDuplicateFieldException() {
+		TypeExtractor.createTypeInfo(HasDuplicateField.class);
+	}
+
+	// test with correct pojo types
+	public static class WC { // is a pojo
+		public ComplexNestedClass complex; // is a pojo
+		private int count; // is a BasicType
+
+		public WC() {
+		}
+		public int getCount() {
+			return count;
+		}
+		public void setCount(int c) {
+			this.count = c;
+		}
+	}
+	public static class ComplexNestedClass { // pojo
+		public static int ignoreStaticField;
+		public transient int ignoreTransientField;
+		public Date date; // generic type
+		public Integer someNumberWithÜnicödeNäme; // BasicType
+		public float someFloat; // BasicType
+		public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
+		public Object nothing; // generic type
+		public MyWritable hadoopCitizen;  // writableType
+		public List<String> collection;
+	}
+
+	// all public test
+	public static class AllPublic extends ComplexNestedClass {
+		public ArrayList<String> somethingFancy; // generic type
+		public HashMultiset<Integer> fancyIds; // generic type
+		public String[]	fancyArray;			 // generic type
+	}
+
+	public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> {
+		public String field3;
+	}
+	public static class PojoWithGenerics<T1, T2> {
+		public int key;
+		public T1 field1;
+		public T2 field2;
+	}
+
+	public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {}
+	public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {}
+
+	// extends from Tuple and adds a field
+	public static class FromTuple extends Tuple3<String, String, Long> {
+		private static final long serialVersionUID = 1L;
+		public int special;
+	}
+
+	public static class IncorrectPojo {
+		private int isPrivate;
+		public int getIsPrivate() {
+			return isPrivate;
+		}
+		// setter is missing (intentional)
+	}
+
+	// correct pojo
+	public static class BeanStylePojo {
+		public String abc;
+		private int field;
+		public int getField() {
+			return this.field;
+		}
+		public void setField(int f) {
+			this.field = f;
+		}
+	}
+	public static class WrongCtorPojo {
+		public int a;
+		public WrongCtorPojo(int a) {
+			this.a = a;
+		}
+	}
+
+	public static class PojoWithGenericFields {
+		private Collection<String> users;
+		private boolean favorited;
+
+		public boolean isFavorited() {
+			return favorited;
+		}
+
+		public void setFavorited(boolean favorited) {
+			this.favorited = favorited;
+		}
+
+		public Collection<String> getUsers() {
+			return users;
+		}
+
+		public void setUsers(Collection<String> users) {
+			this.users = users;
+		}
+	}
+	@Test
+	public void testPojoWithGenericFields() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenericFields.class);
+
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+	}
+
+
+	// in this test, the location of the getters and setters is mixed across the type hierarchy.
+	public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
+		public void setPackageProtected(String in) {
+			this.packageProtected = in;
+		}
+	}
+	public static class GenericPojoGetterSetterCheck<T> {
+		T packageProtected;
+		public T getPackageProtected() {
+			return packageProtected;
+		}
+	}
+
+	@Test
+	public void testIncorrectPojos() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class);
+		Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
+
+		typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class);
+		Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
+	}
+
+	@Test
+	public void testCorrectPojos() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class);
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+
+		typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+	}
+
+	@Test
+	public void testPojoWC() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class);
+		checkWCPojoAsserts(typeForClass);
+
+		WC t = new WC();
+		t.complex = new ComplexNestedClass();
+		TypeInformation<?> typeForObject = TypeExtractor.getForObject(t);
+		checkWCPojoAsserts(typeForObject);
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
+		Assert.assertFalse(typeInfo.isBasicType());
+		Assert.assertFalse(typeInfo.isTupleType());
+		Assert.assertEquals(10, typeInfo.getTotalFields());
+		Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
+		PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
+
+		List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
+		String[] fields = {"count",
+				"complex.date",
+				"complex.hadoopCitizen",
+				"complex.collection",
+				"complex.nothing",
+				"complex.someFloat",
+				"complex.someNumberWithÜnicödeNäme",
+				"complex.word.f0",
+				"complex.word.f1",
+				"complex.word.f2"};
+		int[] positions = {9,
+				1,
+				2,
+				0,
+				3,
+				4,
+				5,
+				6,
+				7,
+				8};
+		Assert.assertEquals(fields.length, positions.length);
+		for(int i = 0; i < fields.length; i++) {
+			pojoType.getFlatFields(fields[i], 0, ffd);
+			Assert.assertEquals("Too many keys returned", 1, ffd.size());
+			Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition());
+			ffd.clear();
+		}
+
+		pojoType.getFlatFields("complex.word.*", 0, ffd);
+		Assert.assertEquals(3, ffd.size());
+		// check if it returns 5,6,7
+		for(FlatFieldDescriptor ffdE : ffd) {
+			final int pos = ffdE.getPosition();
+			Assert.assertTrue(pos <= 8 );
+			Assert.assertTrue(6 <= pos );
+			if(pos == 6) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 7) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 8) {
+				Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
+			}
+		}
+		ffd.clear();
+
+		// scala style full tuple selection for pojos
+		pojoType.getFlatFields("complex.word._", 0, ffd);
+		Assert.assertEquals(3, ffd.size());
+		ffd.clear();
+
+		pojoType.getFlatFields("complex.*", 0, ffd);
+		Assert.assertEquals(9, ffd.size());
+		// check if it returns 0-7
+		for(FlatFieldDescriptor ffdE : ffd) {
+			final int pos = ffdE.getPosition();
+			Assert.assertTrue(ffdE.getPosition() <= 8 );
+			Assert.assertTrue(0 <= ffdE.getPosition() );
+
+			if(pos == 0) {
+				Assert.assertEquals(List.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 1) {
+				Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 2) {
+				Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 3) {
+				Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 4) {
+				Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 5) {
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 6) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 7) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 8) {
+				Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 9) {
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+			}
+		}
+		ffd.clear();
+
+		pojoType.getFlatFields("*", 0, ffd);
+		Assert.assertEquals(10, ffd.size());
+		// check if it returns 0-8
+		for(FlatFieldDescriptor ffdE : ffd) {
+			Assert.assertTrue(ffdE.getPosition() <= 9 );
+			Assert.assertTrue(0 <= ffdE.getPosition() );
+			if(ffdE.getPosition() == 9) {
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+			}
+		}
+		ffd.clear();
+
+		TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex
+		Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
+
+		Assert.assertEquals(7, typeComplexNested.getArity());
+		Assert.assertEquals(9, typeComplexNested.getTotalFields());
+		PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested;
+
+		boolean dateSeen = false, intSeen = false, floatSeen = false,
+				tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false;
+		for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
+			PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("date")) {
+				if(dateSeen) {
+					Assert.fail("already seen");
+				}
+				dateSeen = true;
+				Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(Date.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("someNumberWithÜnicödeNäme")) {
+				if(intSeen) {
+					Assert.fail("already seen");
+				}
+				intSeen = true;
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(Integer.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("someFloat")) {
+				if(floatSeen) {
+					Assert.fail("already seen");
+				}
+				floatSeen = true;
+				Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(Float.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("word")) {
+				if(tupleSeen) {
+					Assert.fail("already seen");
+				}
+				tupleSeen = true;
+				Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>);
+				Assert.assertEquals(Tuple3.class, field.getTypeInformation().getTypeClass());
+				// do some more advanced checks on the tuple
+				TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.getTypeInformation();
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0));
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1));
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2));
+			} else if(name.equals("nothing")) {
+				if(objectSeen) {
+					Assert.fail("already seen");
+				}
+				objectSeen = true;
+				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+				Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("hadoopCitizen")) {
+				if(writableSeen) {
+					Assert.fail("already seen");
+				}
+				writableSeen = true;
+				Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
+				Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("collection")) {
+				if(collectionSeen) {
+					Assert.fail("already seen");
+				}
+				collectionSeen = true;
+				Assert.assertEquals(new GenericTypeInfo(List.class), field.getTypeInformation());
+
+			} else {
+				Assert.fail("field "+field+" is not expected");
+			}
+		}
+		Assert.assertTrue("Field was not present", dateSeen);
+		Assert.assertTrue("Field was not present", intSeen);
+		Assert.assertTrue("Field was not present", floatSeen);
+		Assert.assertTrue("Field was not present", tupleSeen);
+		Assert.assertTrue("Field was not present", objectSeen);
+		Assert.assertTrue("Field was not present", writableSeen);
+		Assert.assertTrue("Field was not present", collectionSeen);
+
+		TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count
+		Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
+
+		Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
+		Assert.assertEquals(typeInfo.getArity(), 2);
+	}
+
+	@Test
+	public void testPojoAllPublic() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
+		checkAllPublicAsserts(typeForClass);
+
+		TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() );
+		checkAllPublicAsserts(typeForObject);
+	}
+
+	private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
+		Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
+		Assert.assertEquals(10, typeInformation.getArity());
+		Assert.assertEquals(12, typeInformation.getTotalFields());
+		// check if the three additional fields are identified correctly
+		boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false;
+		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
+		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("somethingFancy")) {
+				if(arrayListSeen) {
+					Assert.fail("already seen");
+				}
+				arrayListSeen = true;
+				Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
+				Assert.assertEquals(ArrayList.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("fancyIds")) {
+				if(multisetSeen) {
+					Assert.fail("already seen");
+				}
+				multisetSeen = true;
+				Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
+				Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass());
+			} else if(name.equals("fancyArray")) {
+				if(strArraySeen) {
+					Assert.fail("already seen");
+				}
+				strArraySeen = true;
+				Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass());
+			} else if(Arrays.asList("date", "someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
+				// ignore these, they are inherited from the ComplexNestedClass
+			}
+			else {
+				Assert.fail("field "+field+" is not expected");
+			}
+		}
+		Assert.assertTrue("Field was not present", arrayListSeen);
+		Assert.assertTrue("Field was not present", multisetSeen);
+		Assert.assertTrue("Field was not present", strArraySeen);
+	}
+
+	@Test
+	public void testPojoExtendingTuple() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class);
+		checkFromTuplePojo(typeForClass);
+
+		FromTuple ft = new FromTuple();
+		ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
+		TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft);
+		checkFromTuplePojo(typeForObject);
+	}
+
+	private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
+		Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
+		Assert.assertEquals(4, typeInformation.getTotalFields());
+		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
+		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("special")) {
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+			} else if(name.equals("f0") || name.equals("f1")) {
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+			} else if(name.equals("f2")) {
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
+			} else {
+				Assert.fail("unexpected field");
+			}
+		}
+	}
+
+	@Test
+	public void testPojoWithGenerics() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
+		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("field1")) {
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("field2")) {
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("field3")) {
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("key")) {
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+			} else {
+				Assert.fail("Unexpected field "+field);
+			}
+		}
+	}
+
+	/**
+	 * Test if the TypeExtractor is accepting untyped generics,
+	 * making them GenericTypes
+	 */
+	@Test
+	public void testPojoWithGenericsSomeFieldsGeneric() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class);
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
+		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("field1")) {
+				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+			} else if (name.equals("field2")) {
+				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+			} else if (name.equals("key")) {
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+			} else {
+				Assert.fail("Unexpected field "+field);
+			}
+		}
+	}
+
+
+	@Test
+	public void testPojoWithComplexHierarchy() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
+		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
+			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("field1")) {
+				Assert.assertTrue(field.getTypeInformation() instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
+			} else if (name.equals("field2")) {
+				Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>);
+				Assert.assertTrue( ((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
+			} else if (name.equals("key")) {
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+			} else {
+				Assert.fail("Unexpected field "+field);
+			}
+		}
+	}
+	
+	public static class MyMapper<T> implements MapFunction<PojoWithGenerics<Long, T>, PojoWithGenerics<T,T>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public PojoWithGenerics<T, T> map(PojoWithGenerics<Long, T> value)
+				throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference1() {
+		MapFunction<?, ?> function = new MyMapper<String>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>"));
+		
+		Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
+		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+		for(int i = 0; i < pti.getArity(); i++) {
+			PojoField field = pti.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("field1")) {
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("field2")) {
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("key")) {
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+			} else {
+				Assert.fail("Unexpected field "+field);
+			}
+		}
+	}
+
+	public static class PojoTuple<A, B, C> extends Tuple3<B, C, Long> {
+		private static final long serialVersionUID = 1L;
+
+		public A extraField;
+	}
+
+	public static class MyMapper2<D, E> implements MapFunction<Tuple2<E, D>, PojoTuple<E, D, D>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public PojoTuple<E, D, D> map(Tuple2<E, D> value) throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference2() {
+		MapFunction<?, ?> function = new MyMapper2<Boolean, Character>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("Tuple2<Character,Boolean>"));
+		Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
+		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
+		for(int i = 0; i < pti.getArity(); i++) {
+			PojoField field = pti.getPojoFieldAt(i);
+			String name = field.getField().getName();
+			if(name.equals("extraField")) {
+				Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("f0")) {
+				Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("f1")) {
+				Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation());
+			} else if (name.equals("f2")) {
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
+			} else {
+				Assert.fail("Unexpected field "+field);
+			}
+		}
+	}
+
+	public static class MyMapper3<D, E> implements MapFunction<PojoTuple<E, D, D>, Tuple2<E, D>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<E, D> map(PojoTuple<E, D, D> value) throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference3() {
+		MapFunction<?, ?> function = new MyMapper3<Boolean, Character>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>"));
+		
+		Assert.assertTrue(ti instanceof TupleTypeInfo<?>);
+		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+		Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(0));
+		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(1));
+	}
+
+	public static class PojoWithParameterizedFields1<Z> {
+		public Tuple2<Z, Z> field;
+	}
+
+	public static class MyMapper4<A> implements MapFunction<PojoWithParameterizedFields1<A>, A> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public A map(PojoWithParameterizedFields1<A> value) throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference4() {
+		MapFunction<?, ?> function = new MyMapper4<Byte>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>"));
+		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
+	}
+
+	public static class PojoWithParameterizedFields2<Z> {
+		public PojoWithGenerics<Z, Z> field;
+	}
+
+	public static class MyMapper5<A> implements MapFunction<PojoWithParameterizedFields2<A>, A> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public A map(PojoWithParameterizedFields2<A> value) throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference5() {
+		MapFunction<?, ?> function = new MyMapper5<Byte>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields2<"
+						+ "field=org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>"
+						+ ">"));
+		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
+	}
+	
+	public static class PojoWithParameterizedFields3<Z> {
+		public Z[] field;
+	}
+
+	public static class MyMapper6<A> implements MapFunction<PojoWithParameterizedFields3<A>, A> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public A map(PojoWithParameterizedFields3<A> value) throws Exception {
+			return null;
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference6() {
+		MapFunction<?, ?> function = new MyMapper6<Integer>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields3<"
+						+ "field=int[]"
+						+ ">"));
+		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	public static class MyMapper7<A> implements MapFunction<PojoWithParameterizedFields4<A>, A> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public A map(PojoWithParameterizedFields4<A> value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class PojoWithParameterizedFields4<Z> {
+		public Tuple1<Z>[] field;
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testGenericPojoTypeInference7() {
+		MapFunction<?, ?> function = new MyMapper7<Integer>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
+				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields4<"
+						+ "field=Tuple1<int>[]"
+						+ ">"));
+		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
+	}
+
+	public static class RecursivePojo1 {
+		public RecursivePojo1 field;
+	}
+
+	public static class RecursivePojo2 {
+		public Tuple1<RecursivePojo2> field;
+	}
+
+	public static class RecursivePojo3 {
+		public NestedPojo field;
+	}
+
+	public static class NestedPojo {
+		public RecursivePojo3 field;
+	}
+
+	@Test
+	public void testRecursivePojo1() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).getTypeInformation().getClass());
+	}
+
+	@Test
+	public void testRecursivePojo2() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+		Assert.assertTrue(pf.getTypeInformation() instanceof TupleTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.getTypeInformation()).getTypeAt(0).getClass());
+	}
+
+	@Test
+	public void testRecursivePojo3() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+		Assert.assertTrue(pf.getTypeInformation() instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass());
+	}
+
+	public static class FooBarPojo {
+		public int foo, bar;
+		public FooBarPojo() {}
+	}
+
+	public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> {
+		@Override
+		public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testDualUseOfPojo() {
+		MapFunction<?, ?> function = new DuplicateMapper();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class));
+		Assert.assertTrue(ti instanceof TupleTypeInfo);
+		TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+		Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+		Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
new file mode 100644
index 0000000..dbe5115
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class PojoTypeInfoTest {
+
+	@Test
+	public void testPojoTypeInfoEquality() {
+		try {
+			TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class);
+			TypeInformation<TestPojo> info2 = TypeExtractor.getForClass(TestPojo.class);
+			
+			assertTrue(info1 instanceof PojoTypeInfo);
+			assertTrue(info2 instanceof PojoTypeInfo);
+			
+			assertTrue(info1.equals(info2));
+			assertTrue(info1.hashCode() == info2.hashCode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPojoTypeInfoInequality() {
+		try {
+			TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class);
+			TypeInformation<AlternatePojo> info2 = TypeExtractor.getForClass(AlternatePojo.class);
+
+			assertTrue(info1 instanceof PojoTypeInfo);
+			assertTrue(info2 instanceof PojoTypeInfo);
+
+			assertFalse(info1.equals(info2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSerializabilityOfPojoTypeInfo() throws IOException, ClassNotFoundException {
+		PojoTypeInfo<TestPojo> pojoTypeInfo = (PojoTypeInfo<TestPojo>)TypeExtractor.getForClass(TestPojo.class);
+
+		byte[] serializedPojoTypeInfo = InstantiationUtil.serializeObject(pojoTypeInfo);
+		PojoTypeInfo<TestPojo> deserializedPojoTypeInfo = (PojoTypeInfo<TestPojo>)InstantiationUtil.deserializeObject(
+			serializedPojoTypeInfo,
+			getClass().getClassLoader());
+
+		assertEquals(pojoTypeInfo, deserializedPojoTypeInfo);
+	}
+
+	@Test
+	public void testPrimitivePojo() {
+		TypeInformation<PrimitivePojo> info1 = TypeExtractor.getForClass(PrimitivePojo.class);
+
+		assertTrue(info1 instanceof PojoTypeInfo);
+	}
+
+	@Test
+	public void testUnderscorePojo() {
+		TypeInformation<UnderscorePojo> info1 = TypeExtractor.getForClass(UnderscorePojo.class);
+
+		assertTrue(info1 instanceof PojoTypeInfo);
+	}
+
+	public static final class TestPojo {
+		
+		public int someInt;
+
+		private String aString;
+		
+		public Double[] doubleArray;
+		
+		
+		public void setaString(String aString) {
+			this.aString = aString;
+		}
+		
+		public String getaString() {
+			return aString;
+		}
+	}
+
+	public static final class AlternatePojo {
+
+		public int someInt;
+
+		private String aString;
+
+		public Double[] doubleArray;
+
+
+		public void setaString(String aString) {
+			this.aString = aString;
+		}
+
+		public String getaString() {
+			return aString;
+		}
+	}
+
+	public static final class PrimitivePojo {
+
+		private int someInt;
+
+		public void setSomeInt(Integer someInt) {
+			this.someInt = someInt;
+		}
+
+		public Integer getSomeInt() {
+			return this.someInt;
+		}
+	}
+
+	public static final class UnderscorePojo {
+
+		private int some_int;
+
+		public void setSomeInt(int some_int) {
+			this.some_int = some_int;
+		}
+
+		public Integer getSomeInt() {
+			return this.some_int;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
new file mode 100644
index 0000000..51e481d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInformationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Test;
+
+public class PojoTypeInformationTest {
+
+	public static class SimplePojo {
+		public String str;
+		public Boolean Bl;
+		public boolean bl;
+		public Byte Bt;
+		public byte bt;
+		public Short Shrt;
+		public short shrt;
+		public Integer Intgr;
+		public int intgr;
+		public Long Lng;
+		public long lng;
+		public Float Flt;
+		public float flt;
+		public Double Dbl;
+		public double dbl;
+		public Character Ch;
+		public char ch;
+		public int[] primIntArray;
+		public Integer[] intWrapperArray;
+	}
+
+	@Test
+	public void testSimplePojoTypeExtraction() {
+		TypeInformation<SimplePojo> type = TypeExtractor.getForClass(SimplePojo.class);
+		assertTrue("Extracted type is not a composite/pojo type but should be.", type instanceof CompositeType);
+	}
+
+	public static class NestedPojoInner {
+		public String field;
+	}
+
+	public static class NestedPojoOuter {
+		public Integer intField;
+		public NestedPojoInner inner;
+	}
+
+	@Test
+	public void testNestedPojoTypeExtraction() {
+		TypeInformation<NestedPojoOuter> type = TypeExtractor.getForClass(NestedPojoOuter.class);
+		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType);
+	}
+
+	public static class Recursive1Pojo {
+		public Integer intField;
+		public Recursive2Pojo rec;
+	}
+
+	public static class Recursive2Pojo {
+		public String strField;
+		public Recursive1Pojo rec;
+	}
+
+	@Test
+	public void testRecursivePojoTypeExtraction() {
+		// This one tests whether a recursive pojo is detected using the set of visited
+		// types in the type extractor. The recursive field will be handled using the generic serializer.
+		TypeInformation<Recursive1Pojo> type = TypeExtractor.getForClass(Recursive1Pojo.class);
+		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType);
+	}
+	
+	@Test
+	public void testRecursivePojoObjectTypeExtraction() {
+		TypeInformation<Recursive1Pojo> type = TypeExtractor.getForObject(new Recursive1Pojo());
+		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
new file mode 100644
index 0000000..b6cff34
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TupleTypeInfoTest extends TestLogger {
+
+	@Test
+	public void testTupleTypeInfoSymmetricEqualityRelation() {
+		TupleTypeInfo<Tuple1<Integer>> tupleTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO);
+
+		TupleTypeInfoBase<Tuple1> anonymousTupleTypeInfo = new TupleTypeInfoBase<Tuple1>(
+			(Class<Tuple1>)Tuple1.class,
+			(TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO) {
+
+			private static final long serialVersionUID = -7985593598027660836L;
+
+			@Override
+			public TypeSerializer<Tuple1> createSerializer(ExecutionConfig config) {
+				return null;
+			}
+
+			@Override
+			protected TypeComparatorBuilder<Tuple1> createTypeComparatorBuilder() {
+				return null;
+			}
+
+			@Override
+			public String[] getFieldNames() {
+				return new String[0];
+			}
+
+			@Override
+			public int getFieldIndex(String fieldName) {
+				return 0;
+			}
+		};
+
+		boolean tupleVsAnonymous = tupleTypeInfo.equals(anonymousTupleTypeInfo);
+		boolean anonymousVsTuple = anonymousTupleTypeInfo.equals(tupleTypeInfo);
+
+		Assert.assertTrue("Equality relation should be symmetric", tupleVsAnonymous == anonymousVsTuple);
+	}
+
+	@Test
+	public void testTupleTypeInfoEquality() {
+		TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo2 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		Assert.assertEquals(tupleTypeInfo1, tupleTypeInfo2);
+		Assert.assertEquals(tupleTypeInfo1.hashCode(), tupleTypeInfo2.hashCode());
+	}
+
+	@Test
+	public void testTupleTypeInfoInequality() {
+		TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		TupleTypeInfo<Tuple2<Integer, Boolean>> tupleTypeInfo2 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+		Assert.assertNotEquals(tupleTypeInfo1, tupleTypeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
new file mode 100644
index 0000000..a606896
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorInputFormatsTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class TypeExtractorInputFormatsTest {
+
+	@Test
+	public void testExtractInputFormatType() {
+		try {
+			InputFormat<?, ?> format = new DummyFloatInputFormat();
+			TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+			assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, typeInfo);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExtractDerivedInputFormatType() {
+		try {
+			// simple type
+			{
+				InputFormat<?, ?> format = new DerivedInputFormat();
+				TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+				assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo);
+			}
+			
+			// composite type
+			{
+				InputFormat<?, ?> format = new DerivedTupleInputFormat();
+				TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+				
+				assertTrue(typeInfo.isTupleType());
+				assertTrue(typeInfo instanceof TupleTypeInfo);
+				
+				@SuppressWarnings("unchecked")
+				TupleTypeInfo<Tuple3<String, Short, Double>> tupleInfo = (TupleTypeInfo<Tuple3<String, Short, Double>>) typeInfo;
+				
+				assertEquals(3, tupleInfo.getArity());
+				assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleInfo.getTypeAt(0));
+				assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tupleInfo.getTypeAt(1));
+				assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tupleInfo.getTypeAt(2));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMultiLevelDerivedInputFormatType() {
+		try {
+
+			// composite type
+			{
+				InputFormat<?, ?> format = new FinalRelativeInputFormat();
+				TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+				
+				assertTrue(typeInfo.isTupleType());
+				assertTrue(typeInfo instanceof TupleTypeInfo);
+				
+				@SuppressWarnings("unchecked")
+				TupleTypeInfo<Tuple3<String, Integer, Double>> tupleInfo = (TupleTypeInfo<Tuple3<String, Integer, Double>>) typeInfo;
+				
+				assertEquals(3, tupleInfo.getArity());
+				assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleInfo.getTypeAt(0));
+				assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleInfo.getTypeAt(1));
+				assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tupleInfo.getTypeAt(2));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testQueryableFormatType() {
+		try {
+			InputFormat<?, ?> format = new QueryableInputFormat();
+			TypeInformation<?> typeInfo = TypeExtractor.getInputFormatTypes(format);
+			assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, typeInfo);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Test formats
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class DummyFloatInputFormat implements InputFormat<Float, InputSplit> {
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; }
+
+		@Override
+		public InputSplit[] createInputSplits(int minNumSplits) { return null; }
+
+		@Override
+		public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; }
+
+		@Override
+		public void open(InputSplit split) {}
+
+		@Override
+		public boolean reachedEnd() { return false; }
+
+		@Override
+		public Float nextRecord(Float reuse) throws IOException { return null; }
+
+		@Override
+		public void close() {}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class DerivedInputFormat extends GenericInputFormat<Short> {
+
+		@Override
+		public boolean reachedEnd() { return false; }
+
+		@Override
+		public Short nextRecord(Short reuse) { return null; }
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class DerivedTupleInputFormat extends GenericInputFormat<Tuple3<String, Short, Double>> {
+
+		@Override
+		public boolean reachedEnd() { return false; }
+
+		@Override
+		public Tuple3<String, Short, Double> nextRecord(Tuple3<String, Short, Double> reuse) { return null; }
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class RelativeInputFormat<T> extends GenericInputFormat<Tuple3<String, T, Double>> {
+
+		@Override
+		public boolean reachedEnd() { return false; }
+
+		@Override
+		public Tuple3<String, T, Double> nextRecord(Tuple3<String, T, Double> reuse) { return null; }
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class FinalRelativeInputFormat extends RelativeInputFormat<Integer> {
+
+		@Override
+		public Tuple3<String, Integer, Double> nextRecord(Tuple3<String, Integer, Double> reuse) { return null; }
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class QueryableInputFormat implements InputFormat<Float, InputSplit>, ResultTypeQueryable<Double> {
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; }
+
+		@Override
+		public InputSplit[] createInputSplits(int minNumSplits) { return null; }
+
+		@Override
+		public DefaultInputSplitAssigner getInputSplitAssigner(InputSplit[] splits) { return null; }
+
+		@Override
+		public void open(InputSplit split) {}
+
+		@Override
+		public boolean reachedEnd() { return false; }
+
+		@Override
+		public Float nextRecord(Float reuse) throws IOException { return null; }
+
+		@Override
+		public void close() {}
+
+		@Override
+		public TypeInformation<Double> getProducedType() {
+			return BasicTypeInfo.DOUBLE_TYPE_INFO;
+		}
+	}
+}


Mime
View raw message