flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [22/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:22 GMT
[FLINK-3303] [core] Move all type utilities to flink-core


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21a71586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21a71586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21a71586

Branch: refs/heads/master
Commit: 21a715867d655bb61df9a9f7eef37e42b99e206a
Parents: 7081836
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Jan 31 23:28:32 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 2 16:55:44 2016 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |   47 +-
 .../apache/flink/api/common/operators/Keys.java |  459 +++++
 .../flink/api/java/functions/KeySelector.java   |   63 +
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   78 +
 .../api/java/typeutils/EitherTypeInfo.java      |  122 ++
 .../flink/api/java/typeutils/EnumTypeInfo.java  |  122 ++
 .../api/java/typeutils/GenericTypeInfo.java     |  116 ++
 .../java/typeutils/InputTypeConfigurable.java   |   42 +
 .../api/java/typeutils/MissingTypeInfo.java     |  121 ++
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  141 ++
 .../flink/api/java/typeutils/PojoField.java     |  108 +
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  406 ++++
 .../api/java/typeutils/ResultTypeQueryable.java |   37 +
 .../flink/api/java/typeutils/TupleTypeInfo.java |  248 +++
 .../api/java/typeutils/TupleTypeInfoBase.java   |  252 +++
 .../flink/api/java/typeutils/TypeExtractor.java | 1692 +++++++++++++++
 .../api/java/typeutils/TypeInfoParser.java      |  383 ++++
 .../flink/api/java/typeutils/ValueTypeInfo.java |  183 ++
 .../api/java/typeutils/WritableTypeInfo.java    |  139 ++
 .../java/typeutils/runtime/AvroSerializer.java  |  201 ++
 .../runtime/CopyableValueComparator.java        |  167 ++
 .../runtime/CopyableValueSerializer.java        |  129 ++
 .../typeutils/runtime/DataInputDecoder.java     |  229 +++
 .../typeutils/runtime/DataInputViewStream.java  |   71 +
 .../typeutils/runtime/DataOutputEncoder.java    |  190 ++
 .../typeutils/runtime/DataOutputViewStream.java |   41 +
 .../typeutils/runtime/EitherSerializer.java     |  193 ++
 .../runtime/GenericTypeComparator.java          |  177 ++
 .../api/java/typeutils/runtime/KryoUtils.java   |   87 +
 .../java/typeutils/runtime/NoFetchingInput.java |  141 ++
 .../java/typeutils/runtime/PojoComparator.java  |  354 ++++
 .../java/typeutils/runtime/PojoSerializer.java  |  592 ++++++
 .../runtime/RuntimeComparatorFactory.java       |   75 +
 .../runtime/RuntimePairComparatorFactory.java   |   44 +
 .../runtime/RuntimeSerializerFactory.java       |  124 ++
 .../typeutils/runtime/Tuple0Serializer.java     |  121 ++
 .../java/typeutils/runtime/TupleComparator.java |  157 ++
 .../typeutils/runtime/TupleComparatorBase.java  |  279 +++
 .../java/typeutils/runtime/TupleSerializer.java |  158 ++
 .../typeutils/runtime/TupleSerializerBase.java  |  102 +
 .../java/typeutils/runtime/ValueComparator.java |  183 ++
 .../java/typeutils/runtime/ValueSerializer.java |  152 ++
 .../typeutils/runtime/WritableComparator.java   |  189 ++
 .../typeutils/runtime/WritableSerializer.java   |  153 ++
 .../typeutils/runtime/kryo/KryoSerializer.java  |  382 ++++
 .../typeutils/runtime/kryo/Serializers.java     |  227 ++
 .../common/operators/ExpressionKeysTest.java    |  481 +++++
 .../operators/SelectorFunctionKeysTest.java     |  154 ++
 .../apache/flink/api/java/tuple/Tuple2Test.java |   44 +
 .../api/java/typeutils/CompositeTypeTest.java   |  179 ++
 .../api/java/typeutils/EitherTypeInfoTest.java  |   61 +
 .../api/java/typeutils/EnumTypeInfoTest.java    |   51 +
 .../api/java/typeutils/GenericTypeInfoTest.java |   47 +
 .../api/java/typeutils/MissingTypeInfoTest.java |   47 +
 .../java/typeutils/ObjectArrayTypeInfoTest.java |   58 +
 .../java/typeutils/PojoTypeExtractionTest.java  |  812 ++++++++
 .../api/java/typeutils/PojoTypeInfoTest.java    |  153 ++
 .../java/typeutils/PojoTypeInformationTest.java |   98 +
 .../api/java/typeutils/TupleTypeInfoTest.java   |   96 +
 .../TypeExtractorInputFormatsTest.java          |  231 +++
 .../api/java/typeutils/TypeExtractorTest.java   | 1907 +++++++++++++++++
 .../api/java/typeutils/TypeInfoParserTest.java  |  338 +++
 .../api/java/typeutils/ValueTypeInfoTest.java   |   87 +
 .../java/typeutils/WritableTypeInfoTest.java    |   74 +
 .../AbstractGenericArraySerializerTest.java     |  187 ++
 .../AbstractGenericTypeComparatorTest.java      |  376 ++++
 .../AbstractGenericTypeSerializerTest.java      |  364 ++++
 .../runtime/AvroGenericArraySerializerTest.java |   28 +
 .../runtime/AvroGenericTypeComparatorTest.java  |   28 +
 .../runtime/AvroGenericTypeSerializerTest.java  |   29 +
 .../runtime/AvroSerializerEmptyArrayTest.java   |  189 ++
 .../runtime/CopyableValueComparatorTest.java    |   53 +
 .../typeutils/runtime/EitherSerializerTest.java |  113 +
 .../runtime/GenericPairComparatorTest.java      |   89 +
 .../MultidimensionalArraySerializerTest.java    |  120 ++
 .../typeutils/runtime/PojoComparatorTest.java   |   63 +
 .../typeutils/runtime/PojoContainingTuple.java  |   44 +
 .../runtime/PojoGenericTypeSerializerTest.java  |   33 +
 .../typeutils/runtime/PojoSerializerTest.java   |  243 +++
 .../runtime/PojoSubclassComparatorTest.java     |   76 +
 .../runtime/PojoSubclassSerializerTest.java     |  196 ++
 .../typeutils/runtime/StringArrayWritable.java  |   83 +
 .../SubclassFromInterfaceSerializerTest.java    |  171 ++
 .../runtime/TestDataOutputSerializer.java       |  308 +++
 .../runtime/TupleComparatorILD2Test.java        |   73 +
 .../runtime/TupleComparatorILD3Test.java        |   75 +
 .../runtime/TupleComparatorILDC3Test.java       |   75 +
 .../runtime/TupleComparatorILDX1Test.java       |   71 +
 .../runtime/TupleComparatorILDXC2Test.java      |   73 +
 .../runtime/TupleComparatorISD1Test.java        |   69 +
 .../runtime/TupleComparatorISD2Test.java        |   73 +
 .../runtime/TupleComparatorISD3Test.java        |   75 +
 .../runtime/TupleComparatorTTT1Test.java        |  139 ++
 .../runtime/TupleComparatorTTT2Test.java        |  145 ++
 .../runtime/TupleComparatorTTT3Test.java        |  154 ++
 .../typeutils/runtime/TupleSerializerTest.java  |  238 +++
 .../runtime/TupleSerializerTestInstance.java    |   79 +
 .../typeutils/runtime/ValueComparatorTest.java  |   53 +
 .../runtime/ValueComparatorUUIDTest.java        |   46 +
 .../api/java/typeutils/runtime/ValueID.java     |   72 +
 .../runtime/ValueSerializerUUIDTest.java        |   50 +
 .../runtime/WritableComparatorTest.java         |   53 +
 .../runtime/WritableComparatorUUIDTest.java     |   46 +
 .../api/java/typeutils/runtime/WritableID.java  |   78 +
 .../runtime/WritableSerializerTest.java         |   50 +
 .../runtime/WritableSerializerUUIDTest.java     |   50 +
 .../runtime/kryo/KryoClearedBufferTest.java     |  287 +++
 .../kryo/KryoGenericArraySerializerTest.java    |   30 +
 .../kryo/KryoGenericTypeComparatorTest.java     |   30 +
 .../kryo/KryoGenericTypeSerializerTest.java     |  168 ++
 .../kryo/KryoWithCustomSerializersTest.java     |   75 +
 .../typeutils/runtime/kryo/SerializersTest.java |  103 +
 .../tuple/base/TupleComparatorTestBase.java     |   43 +
 .../tuple/base/TuplePairComparatorTestBase.java |  109 +
 flink-java/pom.xml                              |   32 +-
 .../java/org/apache/flink/api/java/DataSet.java |    2 +-
 .../java/org/apache/flink/api/java/Utils.java   |   19 -
 .../flink/api/java/functions/KeySelector.java   |   63 -
 .../api/java/functions/SemanticPropUtil.java    |    2 +-
 .../flink/api/java/io/SplitDataProperties.java  |    2 +-
 .../api/java/operators/AggregateOperator.java   |    1 +
 .../api/java/operators/CoGroupOperator.java     |   23 +-
 .../api/java/operators/CoGroupRawOperator.java  |    3 +-
 .../flink/api/java/operators/DataSink.java      |    1 +
 .../api/java/operators/DeltaIteration.java      |    1 +
 .../java/operators/DeltaIterationResultSet.java |    1 +
 .../api/java/operators/DistinctOperator.java    |    7 +-
 .../java/operators/GroupCombineOperator.java    |   11 +-
 .../api/java/operators/GroupReduceOperator.java |   11 +-
 .../flink/api/java/operators/Grouping.java      |    1 +
 .../flink/api/java/operators/JoinOperator.java  |   15 +-
 .../flink/api/java/operators/KeyFunctions.java  |  119 ++
 .../apache/flink/api/java/operators/Keys.java   |  550 -----
 .../api/java/operators/PartitionOperator.java   |    9 +-
 .../api/java/operators/ReduceOperator.java      |    9 +-
 .../java/operators/SortPartitionOperator.java   |    1 +
 .../api/java/operators/SortedGrouping.java      |    3 +-
 .../api/java/operators/UdfOperatorUtils.java    |    1 +
 .../api/java/operators/UnsortedGrouping.java    |    1 +
 .../operators/join/JoinOperatorSetsBase.java    |    2 +-
 .../PlanBothUnwrappingCoGroupOperator.java      |    2 +-
 .../PlanLeftUnwrappingCoGroupOperator.java      |    2 +-
 .../PlanRightUnwrappingCoGroupOperator.java     |    2 +-
 .../PlanUnwrappingGroupCombineOperator.java     |    2 +-
 .../PlanUnwrappingReduceGroupOperator.java      |    2 +-
 .../PlanUnwrappingReduceOperator.java           |    2 +-
 ...lanUnwrappingSortedGroupCombineOperator.java |    2 +-
 ...PlanUnwrappingSortedReduceGroupOperator.java |    2 +-
 .../apache/flink/api/java/sca/UdfAnalyzer.java  |    4 +-
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   78 -
 .../apache/flink/api/java/typeutils/Either.java |  185 --
 .../api/java/typeutils/EitherTypeInfo.java      |  121 --
 .../flink/api/java/typeutils/EnumTypeInfo.java  |  122 --
 .../api/java/typeutils/GenericTypeInfo.java     |  116 --
 .../java/typeutils/InputTypeConfigurable.java   |   42 -
 .../api/java/typeutils/MissingTypeInfo.java     |  121 --
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  141 --
 .../flink/api/java/typeutils/PojoField.java     |  108 -
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  405 ----
 .../api/java/typeutils/ResultTypeQueryable.java |   37 -
 .../flink/api/java/typeutils/TupleTypeInfo.java |  248 ---
 .../api/java/typeutils/TupleTypeInfoBase.java   |  251 ---
 .../flink/api/java/typeutils/TypeExtractor.java | 1687 ---------------
 .../api/java/typeutils/TypeInfoParser.java      |  383 ----
 .../flink/api/java/typeutils/ValueTypeInfo.java |  183 --
 .../api/java/typeutils/WritableTypeInfo.java    |  139 --
 .../java/typeutils/runtime/AvroSerializer.java  |  201 --
 .../runtime/CopyableValueComparator.java        |  167 --
 .../runtime/CopyableValueSerializer.java        |  129 --
 .../typeutils/runtime/DataInputDecoder.java     |  229 ---
 .../typeutils/runtime/DataInputViewStream.java  |   71 -
 .../typeutils/runtime/DataOutputEncoder.java    |  190 --
 .../typeutils/runtime/DataOutputViewStream.java |   41 -
 .../typeutils/runtime/EitherSerializer.java     |  193 --
 .../runtime/GenericTypeComparator.java          |  177 --
 .../api/java/typeutils/runtime/KryoUtils.java   |   87 -
 .../java/typeutils/runtime/NoFetchingInput.java |  141 --
 .../java/typeutils/runtime/PojoComparator.java  |  354 ----
 .../java/typeutils/runtime/PojoSerializer.java  |  592 ------
 .../runtime/RuntimeComparatorFactory.java       |   75 -
 .../runtime/RuntimePairComparatorFactory.java   |   44 -
 .../runtime/RuntimeSerializerFactory.java       |  124 --
 .../typeutils/runtime/Tuple0Serializer.java     |  121 --
 .../java/typeutils/runtime/TupleComparator.java |  157 --
 .../typeutils/runtime/TupleComparatorBase.java  |  279 ---
 .../java/typeutils/runtime/TupleSerializer.java |  158 --
 .../typeutils/runtime/TupleSerializerBase.java  |  102 -
 .../java/typeutils/runtime/ValueComparator.java |  183 --
 .../java/typeutils/runtime/ValueSerializer.java |  152 --
 .../typeutils/runtime/WritableComparator.java   |  189 --
 .../typeutils/runtime/WritableSerializer.java   |  153 --
 .../typeutils/runtime/kryo/KryoSerializer.java  |  382 ----
 .../typeutils/runtime/kryo/Serializers.java     |  229 ---
 .../flink/api/java/TypeExtractionTest.java      |  117 ++
 .../api/java/operators/ExpressionKeysTest.java  |  479 -----
 .../flink/api/java/operators/NamesTest.java     |    4 +-
 .../operators/SelectorFunctionKeysTest.java     |  153 --
 .../flink/api/java/sca/UdfAnalyzerTest.java     |    2 +-
 .../apache/flink/api/java/tuple/Tuple2Test.java |   45 -
 .../type/extractor/PojoTypeExtractionTest.java  |  876 --------
 .../type/extractor/PojoTypeInformationTest.java |   98 -
 .../TypeExtractorInputFormatsTest.java          |  234 ---
 .../java/type/extractor/TypeExtractorTest.java  | 1931 ------------------
 .../api/java/typeutils/CompositeTypeTest.java   |  179 --
 .../api/java/typeutils/EitherTypeInfoTest.java  |   60 -
 .../api/java/typeutils/EnumTypeInfoTest.java    |   51 -
 .../api/java/typeutils/GenericTypeInfoTest.java |   47 -
 .../api/java/typeutils/MissingTypeInfoTest.java |   47 -
 .../java/typeutils/ObjectArrayTypeInfoTest.java |   58 -
 .../api/java/typeutils/PojoTypeInfoTest.java    |  153 --
 .../api/java/typeutils/TupleTypeInfoTest.java   |   96 -
 .../api/java/typeutils/TypeInfoParserTest.java  |  338 ---
 .../api/java/typeutils/ValueTypeInfoTest.java   |   87 -
 .../java/typeutils/WritableTypeInfoTest.java    |   74 -
 .../AbstractGenericArraySerializerTest.java     |  187 --
 .../AbstractGenericTypeComparatorTest.java      |  376 ----
 .../AbstractGenericTypeSerializerTest.java      |  364 ----
 .../runtime/AvroGenericArraySerializerTest.java |   28 -
 .../runtime/AvroGenericTypeComparatorTest.java  |   28 -
 .../runtime/AvroGenericTypeSerializerTest.java  |   29 -
 .../runtime/AvroSerializerEmptyArrayTest.java   |  189 --
 .../runtime/CopyableValueComparatorTest.java    |   53 -
 .../typeutils/runtime/EitherSerializerTest.java |  113 -
 .../runtime/GenericPairComparatorTest.java      |   89 -
 .../MultidimensionalArraySerializerTest.java    |  120 --
 .../typeutils/runtime/PojoComparatorTest.java   |   63 -
 .../typeutils/runtime/PojoContainingTuple.java  |   44 -
 .../runtime/PojoGenericTypeSerializerTest.java  |   33 -
 .../typeutils/runtime/PojoSerializerTest.java   |  243 ---
 .../runtime/PojoSubclassComparatorTest.java     |   76 -
 .../runtime/PojoSubclassSerializerTest.java     |  196 --
 .../typeutils/runtime/StringArrayWritable.java  |   83 -
 .../SubclassFromInterfaceSerializerTest.java    |  171 --
 .../runtime/TestDataOutputSerializer.java       |  308 ---
 .../runtime/TupleComparatorILD2Test.java        |   73 -
 .../runtime/TupleComparatorILD3Test.java        |   75 -
 .../runtime/TupleComparatorILDC3Test.java       |   75 -
 .../runtime/TupleComparatorILDX1Test.java       |   71 -
 .../runtime/TupleComparatorILDXC2Test.java      |   73 -
 .../runtime/TupleComparatorISD1Test.java        |   69 -
 .../runtime/TupleComparatorISD2Test.java        |   73 -
 .../runtime/TupleComparatorISD3Test.java        |   75 -
 .../runtime/TupleComparatorTTT1Test.java        |  139 --
 .../runtime/TupleComparatorTTT2Test.java        |  145 --
 .../runtime/TupleComparatorTTT3Test.java        |  154 --
 .../typeutils/runtime/TupleSerializerTest.java  |  238 ---
 .../runtime/TupleSerializerTestInstance.java    |   79 -
 .../typeutils/runtime/ValueComparatorTest.java  |   53 -
 .../runtime/ValueComparatorUUIDTest.java        |   46 -
 .../api/java/typeutils/runtime/ValueID.java     |   72 -
 .../runtime/ValueSerializerUUIDTest.java        |   50 -
 .../runtime/WritableComparatorTest.java         |   53 -
 .../runtime/WritableComparatorUUIDTest.java     |   46 -
 .../api/java/typeutils/runtime/WritableID.java  |   78 -
 .../runtime/WritableSerializerTest.java         |   50 -
 .../runtime/WritableSerializerUUIDTest.java     |   50 -
 .../runtime/kryo/KryoClearedBufferTest.java     |  287 ---
 .../kryo/KryoGenericArraySerializerTest.java    |   30 -
 .../kryo/KryoGenericTypeComparatorTest.java     |   30 -
 .../kryo/KryoGenericTypeSerializerTest.java     |  168 --
 .../kryo/KryoWithCustomSerializersTest.java     |   75 -
 .../typeutils/runtime/kryo/SerializersTest.java |  103 -
 .../tuple/base/TupleComparatorTestBase.java     |   43 -
 .../tuple/base/TuplePairComparatorTestBase.java |  109 -
 .../flink/python/api/PythonPlanBinder.java      |    2 +-
 .../api/java/table/JavaBatchTranslator.scala    |    5 +-
 .../scala/operators/ScalaAggregateOperator.java |    2 +-
 .../apache/flink/api/scala/CoGroupDataSet.scala |    4 +-
 .../org/apache/flink/api/scala/DataSet.scala    |   21 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |    4 +-
 .../api/scala/UnfinishedCoGroupOperation.scala  |    2 +
 .../apache/flink/api/scala/joinDataSet.scala    |    3 +
 .../api/scala/typeutils/CaseClassTypeInfo.scala |    3 +-
 .../api/scala/unfinishedKeyPairOperation.scala  |    4 +-
 .../streaming/api/datastream/DataStream.java    |    2 +-
 .../streaming/util/keys/KeySelectorUtil.java    |    2 +-
 .../streaming/api/AggregationFunctionTest.java  |    2 +-
 .../scala/operators/CoGroupOperatorTest.scala   |    3 +-
 .../scala/operators/GroupCombineITCase.scala    |   10 +-
 .../api/scala/operators/JoinOperatorTest.scala  |    3 +-
 .../scala/types/TypeInformationGenTest.scala    |    4 +-
 281 files changed, 20419 insertions(+), 20528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index adc9a9b..ba1050c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -40,12 +40,6 @@ under the License.
 			<artifactId>flink-annotations</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>${shading-artifact.name}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
 		
 		<dependency>
 			<groupId>commons-collections</groupId>
@@ -56,20 +50,61 @@ under the License.
 		<dependency>
 			<groupId>com.esotericsoftware.kryo</groupId>
 			<artifactId>kryo</artifactId>
+			<!-- managed version -->
 		</dependency>
 
+		<!-- Avro is needed for the interoperability with Avro types for serialization -->
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<!-- managed version -->
+			<exclusions>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.commons</groupId>
+					<artifactId>commons-compress</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Hadoop is only needed here for serialization interoperability with the Writable type -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>${shading-artifact.name}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
 
+		<!-- test depedencies -->
+		
 		<dependency>
 			<groupId>commons-io</groupId>
 			<artifactId>commons-io</artifactId>
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<version>2.5</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.joda</groupId>
+			<artifactId>joda-convert</artifactId>
+			<version>1.7</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
new file mode 100644
index 0000000..6d681de
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import com.google.common.base.Preconditions;
+
+public abstract class Keys<T> {
+
+	public abstract int getNumberOfKeyFields();
+
+	public abstract int[] computeLogicalKeyPositions();
+
+	protected abstract TypeInformation<?>[] getKeyFieldTypes();
+
+	public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo);
+
+	public boolean isEmpty() {
+		return getNumberOfKeyFields() == 0;
+	}
+
+	/**
+	 * Check if two sets of keys are compatible to each other (matching types, key counts)
+	 */
+	public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException {
+
+		TypeInformation<?>[] thisKeyFieldTypes = this.getKeyFieldTypes();
+		TypeInformation<?>[] otherKeyFieldTypes = other.getKeyFieldTypes();
+
+		if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) {
+			throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE);
+		} else {
+			for (int i = 0; i < thisKeyFieldTypes.length; i++) {
+				if (!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) {
+					throw new IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] );
+				}
+			}
+		}
+		return true;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Specializations for expression-based / extractor-based grouping
+	// --------------------------------------------------------------------------------------------
+	
+	
+	public static class SelectorFunctionKeys<T, K> extends Keys<T> {
+
+		private final KeySelector<T, K> keyExtractor;
+		private final TypeInformation<T> inputType;
+		private final TypeInformation<K> keyType;
+		private final List<FlatFieldDescriptor> keyFields;
+
+		public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) {
+
+			if (keyExtractor == null) {
+				throw new NullPointerException("Key extractor must not be null.");
+			}
+			if (keyType == null) {
+				throw new NullPointerException("Key type must not be null.");
+			}
+			if (!keyType.isKeyType()) {
+				throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type");
+			}
+
+			this.keyExtractor = keyExtractor;
+			this.inputType = inputType;
+			this.keyType = keyType;
+
+			if (keyType instanceof CompositeType) {
+				this.keyFields = ((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR);
+			}
+			else {
+				this.keyFields = new ArrayList<>(1);
+				this.keyFields.add(new FlatFieldDescriptor(0, keyType));
+			}
+		}
+
+		public TypeInformation<K> getKeyType() {
+			return keyType;
+		}
+
+		public TypeInformation<T> getInputType() {
+			return inputType;
+		}
+
+		public KeySelector<T, K> getKeyExtractor() {
+			return keyExtractor;
+		}
+
+		@Override
+		public int getNumberOfKeyFields() {
+			return keyFields.size();
+		}
+
+		@Override
+		public int[] computeLogicalKeyPositions() {
+			int[] logicalKeys = new int[keyFields.size()];
+			for (int i = 0; i < keyFields.size(); i++) {
+				logicalKeys[i] = keyFields.get(i).getPosition();
+			}
+			return logicalKeys;
+		}
+
+		@Override
+		protected TypeInformation<?>[] getKeyFieldTypes() {
+			TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
+			for (int i = 0; i < keyFields.size(); i++) {
+				fieldTypes[i] = keyFields.get(i).getType();
+			}
+			return fieldTypes;
+		}
+		
+		@Override
+		public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) {
+
+			if (keyFields.size() != 1) {
+				throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field.");
+			}
+			
+			if (typeInfo == null) {
+				// try to extract key type from partitioner
+				try {
+					typeInfo = TypeExtractor.getPartitionerTypes(partitioner);
+				}
+				catch (Throwable t) {
+					// best effort check, so we ignore exceptions
+				}
+			}
+
+			// only check if type is known and not a generic type
+			if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) {
+				// check equality of key and partitioner type
+				if (!keyType.equals(typeInfo)) {
+					throw new InvalidProgramException("The partitioner is incompatible with the key type. "
+						+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
+				}
+			}
+		}
+
+		@Override
+		public String toString() {
+			return "Key function (Type: " + keyType + ")";
+		}
+	}
+	
+	
+	/**
+	 * Represents (nested) field access through string and integer-based keys
+	 */
+	public static class ExpressionKeys<T> extends Keys<T> {
+		
+		public static final String SELECT_ALL_CHAR = "*";
+		public static final String SELECT_ALL_CHAR_SCALA = "_";
+		
+		// Flattened fields representing keys fields
+		private List<FlatFieldDescriptor> keyFields;
+
+		/**
+		 * ExpressionKeys that is defined by the full data type.
+		 */
+		public ExpressionKeys(TypeInformation<T> type) {
+			this(SELECT_ALL_CHAR, type);
+		}
+
+		/**
+		 * Create int-based (non-nested) field position keys on a tuple type.
+		 */
+		public ExpressionKeys(int keyPosition, TypeInformation<T> type) {
+			this(new int[]{keyPosition}, type, false);
+		}
+
+		/**
+		 * Create int-based (non-nested) field position keys on a tuple type.
+		 */
+		public ExpressionKeys(int[] keyPositions, TypeInformation<T> type) {
+			this(keyPositions, type, false);
+		}
+
+		/**
+		 * Create int-based (non-nested) field position keys on a tuple type.
+		 */
+		public ExpressionKeys(int[] keyPositions, TypeInformation<T> type, boolean allowEmpty) {
+
+			if (!type.isTupleType() || !(type instanceof CompositeType)) {
+				throw new InvalidProgramException("Specifying keys via field positions is only valid " +
+						"for tuple data types. Type: " + type);
+			}
+			if (type.getArity() == 0) {
+				throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
+			}
+			if (!allowEmpty && (keyPositions == null || keyPositions.length == 0)) {
+				throw new IllegalArgumentException("The grouping fields must not be empty.");
+			}
+
+			this.keyFields = new ArrayList<>();
+
+			if (keyPositions == null || keyPositions.length == 0) {
+				// use all tuple fields as key fields
+				keyPositions = createIncrIntArray(type.getArity());
+			} else {
+				rangeCheckFields(keyPositions, type.getArity() - 1);
+			}
+			Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point");
+
+			// extract key field types
+			CompositeType<T> cType = (CompositeType<T>)type;
+			this.keyFields = new ArrayList<>(type.getTotalFields());
+
+			// for each key position, find all (nested) field types
+			String[] fieldNames = cType.getFieldNames();
+			ArrayList<FlatFieldDescriptor> tmpList = new ArrayList<>();
+			for (int keyPos : keyPositions) {
+				tmpList.clear();
+				// get all flat fields
+				cType.getFlatFields(fieldNames[keyPos], 0, tmpList);
+				// check if fields are of key type
+				for(FlatFieldDescriptor ffd : tmpList) {
+					if(!ffd.getType().isKeyType()) {
+						throw new InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as key.");
+					}
+				}
+				this.keyFields.addAll(tmpList);
+			}
+		}
+
+		/**
+		 * Create String-based (nested) field expression keys on a composite type.
+		 */
+		public ExpressionKeys(String keyExpression, TypeInformation<T> type) {
+			this(new String[]{keyExpression}, type);
+		}
+
+		/**
+		 * Create String-based (nested) field expression keys on a composite type.
+		 */
+		public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
+			Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null.");
+
+			this.keyFields = new ArrayList<>(keyExpressions.length);
+
+			if (type instanceof CompositeType){
+				CompositeType<T> cType = (CompositeType<T>) type;
+
+				// extract the keys on their flat position
+				for (String keyExpr : keyExpressions) {
+					if (keyExpr == null) {
+						throw new InvalidProgramException("Expression key may not be null.");
+					}
+					// strip off whitespace
+					keyExpr = keyExpr.trim();
+
+					List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr);
+
+					if (flatFields.size() == 0) {
+						throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType);
+					}
+					// check if all nested fields can be used as keys
+					for (FlatFieldDescriptor field : flatFields) {
+						if (!field.getType().isKeyType()) {
+							throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key.");
+						}
+					}
+					// add flat fields to key fields
+					keyFields.addAll(flatFields);
+				}
+			}
+			else {
+				if (!type.isKeyType()) {
+					throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
+				}
+
+				// check that all key expressions are valid
+				for (String keyExpr : keyExpressions) {
+					if (keyExpr == null) {
+						throw new InvalidProgramException("Expression key may not be null.");
+					}
+					// strip off whitespace
+					keyExpr = keyExpr.trim();
+					// check that full type is addressed
+					if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
+						throw new InvalidProgramException(
+							"Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types.");
+					}
+					// add full type as key
+					keyFields.add(new FlatFieldDescriptor(0, type));
+				}
+			}
+		}
+		
+		@Override
+		public int getNumberOfKeyFields() {
+			if(keyFields == null) {
+				return 0;
+			}
+			return keyFields.size();
+		}
+
+		@Override
+		public int[] computeLogicalKeyPositions() {
+			int[] logicalKeys = new int[keyFields.size()];
+			for (int i = 0; i < keyFields.size(); i++) {
+				logicalKeys[i] = keyFields.get(i).getPosition();
+			}
+			return logicalKeys;
+		}
+
+		@Override
+		protected TypeInformation<?>[] getKeyFieldTypes() {
+			TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
+			for (int i = 0; i < keyFields.size(); i++) {
+				fieldTypes[i] = keyFields.get(i).getType();
+			}
+			return fieldTypes;
+		}
+
+		@Override
+		public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) {
+
+			if (keyFields.size() != 1) {
+				throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field.");
+			}
+
+			if (typeInfo == null) {
+				// try to extract key type from partitioner
+				try {
+					typeInfo = TypeExtractor.getPartitionerTypes(partitioner);
+				}
+				catch (Throwable t) {
+					// best effort check, so we ignore exceptions
+				}
+			}
+
+			if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) {
+				// only check type compatibility if type is known and not a generic type
+
+				TypeInformation<?> keyType = keyFields.get(0).getType();
+				if (!keyType.equals(typeInfo)) {
+					throw new InvalidProgramException("The partitioner is incompatible with the key type. "
+										+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
+				}
+			}
+		}
+
+		@Override
+		public String toString() {
+			Joiner join = Joiner.on('.');
+			return "ExpressionKeys: " + join.join(keyFields);
+		}
+
+		public static boolean isSortKey(int fieldPos, TypeInformation<?> type) {
+
+			if (!type.isTupleType() || !(type instanceof CompositeType)) {
+				throw new InvalidProgramException("Specifying keys via field positions is only valid " +
+					"for tuple data types. Type: " + type);
+			}
+			if (type.getArity() == 0) {
+				throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
+			}
+
+			if(fieldPos < 0 || fieldPos >= type.getArity()) {
+				throw new IndexOutOfBoundsException("Tuple position is out of range: " + fieldPos);
+			}
+
+			TypeInformation<?> sortKeyType = ((CompositeType<?>)type).getTypeAt(fieldPos);
+			return sortKeyType.isSortKeyType();
+		}
+
+		public static boolean isSortKey(String fieldExpr, TypeInformation<?> type) {
+
+			TypeInformation<?> sortKeyType;
+
+			fieldExpr = fieldExpr.trim();
+			if (SELECT_ALL_CHAR.equals(fieldExpr) || SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) {
+				sortKeyType = type;
+			}
+			else {
+				if (type instanceof CompositeType) {
+					sortKeyType = ((CompositeType<?>) type).getTypeAt(fieldExpr);
+				}
+				else {
+					throw new InvalidProgramException(
+						"Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic types.");
+				}
+			}
+
+			return sortKeyType.isSortKeyType();
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+
+
+	private static int[] createIncrIntArray(int numKeys) {
+		int[] keyFields = new int[numKeys];
+		for (int i = 0; i < numKeys; i++) {
+			keyFields[i] = i;
+		}
+		return keyFields;
+	}
+
+	private static void rangeCheckFields(int[] fields, int maxAllowedField) {
+
+		for (int f : fields) {
+			if (f < 0 || f > maxAllowedField) {
+				throw new IndexOutOfBoundsException("Tuple position is out of range: " + f);
+			}
+		}
+	}
+
+	public static class IncompatibleKeysException extends Exception {
+		private static final long serialVersionUID = 1L;
+		public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different.";
+		
+		public IncompatibleKeysException(String message) {
+			super(message);
+		}
+
+		public IncompatibleKeysException(TypeInformation<?> typeInformation, TypeInformation<?> typeInformation2) {
+			super(typeInformation+" and "+typeInformation2+" are not compatible");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
new file mode 100644
index 0000000..3d06c59
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * The {@link KeySelector} allows to use arbitrary objects for operations such as
+ * reduce, reduceGroup, join, coGoup, etc.
+ * 
+ * The extractor takes an object and returns the key for that object.
+ *
+ * @param <IN> Type of objects to extract the key from.
+ * @param <KEY> Type of key.
+ */
+public interface KeySelector<IN, KEY> extends Function, Serializable {
+	
+	/**
+	 * User-defined function that extracts the key from an arbitrary object.
+	 * 
+	 * For example for a class:
+	 * <pre>
+	 * 	public class Word {
+	 * 		String word;
+	 * 		int count;
+	 * 	}
+	 * </pre>
+	 * The key extractor could return the word as
+	 * a key to group all Word objects by the String they contain.
+	 * 
+	 * The code would look like this
+	 * <pre>
+	 * 	public String getKey(Word w) {
+	 * 		return w.word;
+	 * 	}
+	 * </pre>
+	 * 
+	 * @param value The object to get the key from.
+	 * @return The extracted key.
+	 * 
+	 * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
+	 *                   and trigger recovery or cancellation of the program. 
+	 */
+	KEY getKey(IN value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..5f74513
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
+ *
+ * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>}
+ *     with a {@code GenericType<avro.Utf8>}.
+ * All other types used by Avro are standard Java types.
+ * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
+ * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
+ * by generic type infos containing Utf8 classes (which are comparable),
+ *
+ * This class is checked by the AvroPojoTest.
+ * @param <T>
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
+	public AvroTypeInfo(Class<T> typeClass) {
+		super(typeClass, generateFieldsFromAvroSchema(typeClass));
+	}
+
+	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
+		PojoTypeExtractor pte = new PojoTypeExtractor();
+		TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null);
+
+		if(!(ti instanceof PojoTypeInfo)) {
+			throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
+		}
+		PojoTypeInfo pti =  (PojoTypeInfo) ti;
+		List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields());
+
+		for(int i = 0; i < pti.getArity(); i++) {
+			PojoField f = pti.getPojoFieldAt(i);
+			TypeInformation newType = f.getTypeInformation();
+			// check if type is a CharSequence
+			if(newType instanceof GenericTypeInfo) {
+				if((newType).getTypeClass().equals(CharSequence.class)) {
+					// replace the type by a org.apache.avro.util.Utf8
+					newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+				}
+			}
+			PojoField newField = new PojoField(f.getField(), newType);
+			newFields.add(newField);
+		}
+		return newFields;
+	}
+
+	private static class PojoTypeExtractor extends TypeExtractor {
+		private PojoTypeExtractor() {
+			super();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
new file mode 100644
index 0000000..74d850b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.types.Either;
+
+/**
+ * A {@link TypeInformation} for the {@link Either} type of the Java API.
+ *
+ * @param <L> the Left value type
+ * @param <R> the Right value type
+ */
+public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeInformation<L> leftType;
+
+	private final TypeInformation<R> rightType;
+
+	public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
+		this.leftType = leftType;
+		this.rightType = rightType;
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Class<Either<L, R>> getTypeClass() {
+		return (Class<Either<L, R>>) (Class<?>) Either.class;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config) {
+		return new EitherSerializer<L, R>(leftType.createSerializer(config),
+				rightType.createSerializer(config));
+	}
+
+	@Override
+	public String toString() {
+		return "Either <" + leftType.toString() + ", " + rightType.toString() + ">";
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof EitherTypeInfo) {
+			EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj;
+
+			return other.canEqual(this) &&
+				leftType.equals(other.leftType) &&
+				rightType.equals(other.rightType);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 17 * leftType.hashCode() + rightType.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EitherTypeInfo;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public TypeInformation<L> getLeftType() {
+		return leftType;
+	}
+
+	public TypeInformation<R> getRightType() {
+		return rightType;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
new file mode 100644
index 0000000..de59c36
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.EnumComparator;
+import org.apache.flink.api.common.typeutils.base.EnumSerializer;
+
+/**
+ * A {@link TypeInformation} for java enumeration types. 
+ *
+ * @param <T> The type represented by this type information.
+ */
+public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implements AtomicType<T> {
+
+	private static final long serialVersionUID = 8936740290137178660L;
+	
+	private final Class<T> typeClass;
+
+	public EnumTypeInfo(Class<T> typeClass) {
+		Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
+
+		if (!Enum.class.isAssignableFrom(typeClass) ) {
+			throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
+		}
+
+		this.typeClass = typeClass;
+	}
+
+	@Override
+	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+		return new EnumComparator<T>(sortOrderAscending);
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+	
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return this.typeClass;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return true;
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
+		return new EnumSerializer<T>(typeClass);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Standard utils
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "EnumTypeInfo<" + typeClass.getName() + ">";
+	}	
+	
+	@Override
+	public int hashCode() {
+		return typeClass.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EnumTypeInfo;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof EnumTypeInfo) {
+			@SuppressWarnings("unchecked")
+			EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
+
+			return enumTypeInfo.canEqual(this) &&
+				typeClass == enumTypeInfo.typeClass;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
new file mode 100644
index 0000000..7e7aa68
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+
+public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
+
+	private static final long serialVersionUID = -7959114120287706504L;
+	
+	private final Class<T> typeClass;
+
+	public GenericTypeInfo(Class<T> typeClass) {
+		this.typeClass = Preconditions.checkNotNull(typeClass);
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+	
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return typeClass;
+	}
+	
+	@Override
+	public boolean isKeyType() {
+		return Comparable.class.isAssignableFrom(typeClass);
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+		return new KryoSerializer<T>(this.typeClass, config);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+		if (isKeyType()) {
+			@SuppressWarnings("rawtypes")
+			GenericTypeComparator comparator = new GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), this.typeClass);
+			return (TypeComparator<T>) comparator;
+		}
+
+		throw new UnsupportedOperationException("Types that do not implement java.lang.Comparable cannot be used as keys.");
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return typeClass.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof GenericTypeInfo;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof GenericTypeInfo) {
+			@SuppressWarnings("unchecked")
+			GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj;
+
+			return typeClass == genericTypeInfo.typeClass;
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return "GenericType<" + typeClass.getCanonicalName() + ">";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
new file mode 100644
index 0000000..f8b4247
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured
+ * with the data type they will operate on. The method {@link #setInputType(org.apache.flink.api
+ * .common.typeinfo.TypeInformation, org.apache.flink.api.common.ExecutionConfig)} will be
+ * called when the output format is used with an output method such as
+ * {@link org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+ */
+public interface InputTypeConfigurable {
+
+	/**
+	 * Method that is called on an {@link org.apache.flink.api.common.io.OutputFormat} when it is passed to
+	 * the DataSet's output method. May be used to configures the output format based on the data type.
+	 *
+	 * @param type The data type of the input.
+	 * @param executionConfig
+	 */
+	void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
new file mode 100644
index 0000000..1dd7f01
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A special type information signifying that the type extraction failed. It contains
+ * additional error information.
+ */
+public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
+
+	private static final long serialVersionUID = -4212082837126702723L;
+	
+	private final String functionName;
+	private final InvalidTypesException typeException;
+
+	
+	public MissingTypeInfo(String functionName) {
+		this(functionName, new InvalidTypesException("An unknown error occured."));
+	}
+
+	public MissingTypeInfo(String functionName, InvalidTypesException typeException) {
+		this.functionName = functionName;
+		this.typeException = typeException;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public String getFunctionName() {
+		return functionName;
+	}
+
+	public InvalidTypesException getTypeException() {
+		return typeException;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isBasicType() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public boolean isTupleType() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public int getArity() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public Class<InvalidTypesException> getTypeClass() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public boolean isKeyType() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public TypeSerializer<InvalidTypesException> createSerializer(ExecutionConfig executionConfig) {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof MissingTypeInfo) {
+			MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
+
+			return missingTypeInfo.canEqual(this) &&
+				functionName.equals(missingTypeInfo.functionName) &&
+				typeException.equals(missingTypeInfo.typeException);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * functionName.hashCode() + typeException.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof MissingTypeInfo;
+	}
+
+	@Override
+	public int getTotalFields() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
new file mode 100644
index 0000000..150c976
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Array;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+
+public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
+
+	private static final long serialVersionUID = 1L;
+	
+	private final Class<T> arrayType;
+	private final TypeInformation<C> componentInfo;
+
+	private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) {
+		this.arrayType = Preconditions.checkNotNull(arrayType);
+		this.componentInfo = Preconditions.checkNotNull(componentInfo);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Class<T> getTypeClass() {
+		return arrayType;
+	}
+
+	public TypeInformation<C> getComponentInfo() {
+		return componentInfo;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
+		return (TypeSerializer<T>) new GenericArraySerializer<C>(
+			componentInfo.getTypeClass(),
+			componentInfo.createSerializer(executionConfig));
+	}
+
+	@Override
+	public String toString() {
+		return this.getClass().getSimpleName() + "<" + this.componentInfo + ">";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ObjectArrayTypeInfo) {
+			@SuppressWarnings("unchecked")
+			ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj;
+
+			return objectArrayTypeInfo.canEqual(this) &&
+				arrayType == objectArrayTypeInfo.arrayType &&
+				componentInfo.equals(objectArrayTypeInfo.componentInfo);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ObjectArrayTypeInfo;
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
+		Preconditions.checkNotNull(arrayClass);
+		Preconditions.checkNotNull(componentInfo);
+		Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
+
+		return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
+	}
+
+	/**
+	 * Creates a new {@link org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a
+	 * {@link TypeInformation} for the component type.
+	 *
+	 * <p>
+	 * This must be used in cases where the complete type of the array is not available as a
+	 * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
+		Preconditions.checkNotNull(componentInfo);
+
+		return new ObjectArrayTypeInfo<T, C>(
+			(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
+			componentInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
new file mode 100644
index 0000000..1b008c0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Represent a field definition for {@link PojoTypeInfo} type of objects.
+ */
+public class PojoField implements Serializable {
+
+	private static final long serialVersionUID = 1975295846436559363L;
+
+	private transient Field field;
+	private final TypeInformation<?> type;
+
+	public PojoField(Field field, TypeInformation<?> type) {
+		this.field = Preconditions.checkNotNull(field);
+		this.type = Preconditions.checkNotNull(type);
+	}
+
+	public Field getField() {
+		return field;
+	}
+
+	public TypeInformation<?> getTypeInformation() {
+		return type;
+	}
+
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeObject(field.getDeclaringClass());
+		out.writeUTF(field.getName());
+	}
+
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		Class<?> clazz = (Class<?>)in.readObject();
+		String fieldName = in.readUTF();
+		field = null;
+		// try superclasses as well
+		while (clazz != null) {
+			try {
+				field = clazz.getDeclaredField(fieldName);
+				field.setAccessible(true);
+				break;
+			} catch (NoSuchFieldException e) {
+				clazz = clazz.getSuperclass();
+			}
+		}
+		if (field == null) {
+			throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+					+ " (" + fieldName + ")");
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof PojoField) {
+			PojoField other = (PojoField) obj;
+
+			return other.canEqual(this) && type.equals(other.type) &&
+				Objects.equals(field, other.field);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(field, type);
+	}
+
+	public boolean canEqual(Object obj) {
+		return obj instanceof PojoField;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
new file mode 100644
index 0000000..cc0d239
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+
+import com.google.common.base.Joiner;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+/**
+ * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
+ * since the conditions are slightly different from Java Beans.
+ * A type is considered a FLink POJO type, if it fulfills the conditions below.
+ * <ul>
+ *   <li>It is a public class, and standalone (not a non-static inner class)</li>
+ *   <li>It has a public no-argument constructor.</li>
+ *   <li>All fields are either public, or have public getters and setters.</li>
+ * </ul>
+ * 
+ * @param <T> The type represented by this type information.
+ */
+public class PojoTypeInfo<T> extends CompositeType<T> {
+	
+	private static final long serialVersionUID = 1L;
+
+	private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+	private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
+	private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
+			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
+	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
+	private final PojoField[] fields;
+	
+	private final int totalFields;
+
+	public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
+		super(typeClass);
+
+		Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+				"POJO " + typeClass + " is not public");
+
+		this.fields = fields.toArray(new PojoField[fields.size()]);
+
+		Arrays.sort(this.fields, new Comparator<PojoField>() {
+			@Override
+			public int compare(PojoField o1, PojoField o2) {
+				return o1.getField().getName().compareTo(o2.getField().getName());
+			}
+		});
+
+		int counterFields = 0;
+
+		for(PojoField field : fields) {
+			counterFields += field.getTypeInformation().getTotalFields();
+		}
+
+		totalFields = counterFields;
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return fields.length;
+	}
+	
+	@Override
+	public int getTotalFields() {
+		return totalFields;
+	}
+
+	@Override
+	public boolean isSortKeyType() {
+		// Support for sorting POJOs that implement Comparable is not implemented yet.
+		// Since the order of fields in a POJO type is not well defined, sorting on fields
+		//   gives only some undefined order.
+		return false;
+	}
+	
+
+	@Override
+	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
+
+		Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+		if(!matcher.matches()) {
+			throw new InvalidFieldReferenceException("Invalid POJO field reference \""+fieldExpression+"\".");
+		}
+
+		String field = matcher.group(0);
+		if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+			// handle select all
+			int keyPosition = 0;
+			for(PojoField pField : fields) {
+				if(pField.getTypeInformation() instanceof CompositeType) {
+					CompositeType<?> cType = (CompositeType<?>)pField.getTypeInformation();
+					cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result);
+					keyPosition += cType.getTotalFields()-1;
+				} else {
+					result.add(
+						new NamedFlatFieldDescriptor(
+							pField.getField().getName(),
+							offset + keyPosition,
+							pField.getTypeInformation()));
+				}
+				keyPosition++;
+			}
+			return;
+		} else {
+			field = matcher.group(1);
+		}
+
+		// get field
+		int fieldPos = -1;
+		TypeInformation<?> fieldType = null;
+		for (int i = 0; i < fields.length; i++) {
+			if (fields[i].getField().getName().equals(field)) {
+				fieldPos = i;
+				fieldType = fields[i].getTypeInformation();
+				break;
+			}
+		}
+		if (fieldPos == -1) {
+			throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+".");
+		}
+		String tail = matcher.group(3);
+		if(tail == null) {
+			if(fieldType instanceof CompositeType) {
+				// forward offset
+				for(int i=0; i<fieldPos; i++) {
+					offset += this.getTypeAt(i).getTotalFields();
+				}
+				// add all fields of composite type
+				((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
+			} else {
+				// we found the field to add
+				// compute flat field position by adding skipped fields
+				int flatFieldPos = offset;
+				for(int i=0; i<fieldPos; i++) {
+					flatFieldPos += this.getTypeAt(i).getTotalFields();
+				}
+				result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
+			}
+		} else {
+			if(fieldType instanceof CompositeType<?>) {
+				// forward offset
+				for(int i=0; i<fieldPos; i++) {
+					offset += this.getTypeAt(i).getTotalFields();
+				}
+				((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
+			} else {
+				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+".");
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+
+		Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+		if(!matcher.matches()) {
+			if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+				throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+			} else {
+				throw new InvalidFieldReferenceException("Invalid format of POJO field expression \""+fieldExpression+"\".");
+			}
+		}
+
+		String field = matcher.group(1);
+		// get field
+		int fieldPos = -1;
+		TypeInformation<?> fieldType = null;
+		for (int i = 0; i < fields.length; i++) {
+			if (fields[i].getField().getName().equals(field)) {
+				fieldPos = i;
+				fieldType = fields[i].getTypeInformation();
+				break;
+			}
+		}
+		if (fieldPos == -1) {
+			throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+".");
+		}
+
+		String tail = matcher.group(3);
+		if(tail == null) {
+			// we found the type
+			return (TypeInformation<X>) fieldType;
+		} else {
+			if(fieldType instanceof CompositeType<?>) {
+				return ((CompositeType<?>) fieldType).getTypeAt(tail);
+			} else {
+				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+".");
+			}
+		}
+	}
+
+	@Override
+	public <X> TypeInformation<X> getTypeAt(int pos) {
+		if (pos < 0 || pos >= this.fields.length) {
+			throw new IndexOutOfBoundsException();
+		}
+		@SuppressWarnings("unchecked")
+		TypeInformation<X> typed = (TypeInformation<X>) fields[pos].getTypeInformation();
+		return typed;
+	}
+
+	@Override
+	protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+		return new PojoTypeComparatorBuilder();
+	}
+
+	// used for testing. Maybe use mockito here
+	public PojoField getPojoFieldAt(int pos) {
+		if (pos < 0 || pos >= this.fields.length) {
+			throw new IndexOutOfBoundsException();
+		}
+		return this.fields[pos];
+	}
+
+	public String[] getFieldNames() {
+		String[] result = new String[fields.length];
+		for (int i = 0; i < fields.length; i++) {
+			result[i] = fields[i].getField().getName();
+		}
+		return result;
+	}
+
+	@Override
+	public int getFieldIndex(String fieldName) {
+		for (int i = 0; i < fields.length; i++) {
+			if (fields[i].getField().getName().equals(fieldName)) {
+				return i;
+			}
+		}
+		return -1;
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+		if(config.isForceKryoEnabled()) {
+			return new KryoSerializer<T>(getTypeClass(), config);
+		}
+		if(config.isForceAvroEnabled()) {
+			return new AvroSerializer<T>(getTypeClass());
+		}
+
+		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ];
+		Field[] reflectiveFields = new Field[fields.length];
+
+		for (int i = 0; i < fields.length; i++) {
+			fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
+			reflectiveFields[i] = fields[i].getField();
+		}
+
+		return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof PojoTypeInfo) {
+			@SuppressWarnings("unchecked")
+			PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj;
+
+			return pojoTypeInfo.canEqual(this) &&
+				super.equals(pojoTypeInfo) &&
+				Arrays.equals(fields, pojoTypeInfo.fields) &&
+				totalFields == pojoTypeInfo.totalFields;
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public int hashCode() {
+		return 31 * (31 * Arrays.hashCode(fields) + totalFields) + super.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof PojoTypeInfo;
+	}
+	
+	@Override
+	public String toString() {
+		List<String> fieldStrings = new ArrayList<String>();
+		for (PojoField field : fields) {
+			fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString());
+		}
+		return "PojoType<" + getTypeClass().getName()
+				+ ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]"
+				+ ">";
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private class PojoTypeComparatorBuilder implements TypeComparatorBuilder<T> {
+
+		private ArrayList<TypeComparator> fieldComparators;
+		private ArrayList<Field> keyFields;
+
+		public PojoTypeComparatorBuilder() {
+			fieldComparators = new ArrayList<TypeComparator>();
+			keyFields = new ArrayList<Field>();
+		}
+
+
+		@Override
+		public void initializeTypeComparatorBuilder(int size) {
+			fieldComparators.ensureCapacity(size);
+			keyFields.ensureCapacity(size);
+		}
+
+		@Override
+		public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+			fieldComparators.add(comparator);
+			keyFields.add(fields[fieldId].getField());
+		}
+
+		@Override
+		public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
+			Preconditions.checkState(
+				keyFields.size() > 0,
+				"No keys were defined for the PojoTypeComparatorBuilder.");
+
+			Preconditions.checkState(
+				fieldComparators.size() > 0,
+				"No type comparators were defined for the PojoTypeComparatorBuilder.");
+
+			Preconditions.checkState(
+				keyFields.size() == fieldComparators.size(),
+				"Number of key fields and field comparators is not equal.");
+
+			return new PojoComparator<T>(
+				keyFields.toArray(new Field[keyFields.size()]),
+				fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
+				createSerializer(config),
+				getTypeClass());
+		}
+	}
+
+	public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor {
+
+		private String fieldName;
+
+		public NamedFlatFieldDescriptor(String name, int keyPosition, TypeInformation<?> type) {
+			super(keyPosition, type);
+			this.fieldName = name;
+		}
+
+		public String getFieldName() {
+			return fieldName;
+		}
+
+		@Override
+		public String toString() {
+			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
new file mode 100644
index 0000000..5e0cbed
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * This interface can be implemented by functions and input formats to tell the framework
+ * about their produced data type. This method acts as an alternative to the reflection analysis
+ * that is otherwise performed and is useful in situations where the produced data type may vary
+ * depending on parametrization.
+ */
+public interface ResultTypeQueryable<T> {
+
+	/**
+	 * Gets the data type (as a {@link TypeInformation}) produced by this function or input format.
+	 * 
+	 * @return The data type produced by this function or input format.
+	 */
+	TypeInformation<T> getProducedType();
+}


Mime
View raw message