flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1245] [Java API] Introduce TypeHints for Java API operators
Date Thu, 08 Jan 2015 15:32:50 GMT
[FLINK-1245] [Java API] Introduce TypeHints for Java API operators

Also contains fixes by sewen@apache.org
 - Make MissingTypeInfo optional in TypeExtractor (by default still throws exception)
 - Simplified deferred evaluation of type dependend code by making evaluations lazy
 - Add call location function names to MissingTypeInfo error messages.
 - Improvements on other error messages.

This closes #270


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8dbaeeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8dbaeeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8dbaeeb

Branch: refs/heads/master
Commit: d8dbaeeb4df4524e136423425edc9d87d1f25897
Parents: 06503c8
Author: twalthr <info@twalthr.com>
Authored: Fri Nov 7 16:18:23 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 8 16:29:57 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/client/testjar/WordCount.java  |   7 +-
 .../operators/DualInputSemanticProperties.java  |  16 +-
 .../common/operators/SemanticProperties.java    |  11 +-
 .../SingleInputSemanticProperties.java          |  27 +-
 .../java/org/apache/flink/api/java/DataSet.java | 111 ++++---
 .../api/java/functions/FunctionAnnotation.java  |   3 +-
 .../api/java/operators/CoGroupOperator.java     |   7 +-
 .../flink/api/java/operators/CrossOperator.java |  43 +--
 .../api/java/operators/DistinctOperator.java    |   2 -
 .../api/java/operators/FilterOperator.java      |  14 +-
 .../api/java/operators/FlatMapOperator.java     |   6 +-
 .../api/java/operators/GroupReduceOperator.java |   8 +-
 .../flink/api/java/operators/JoinOperator.java  |  67 ++---
 .../apache/flink/api/java/operators/Keys.java   |   3 +
 .../flink/api/java/operators/MapOperator.java   |   6 +-
 .../java/operators/MapPartitionOperator.java    |   6 +-
 .../api/java/operators/PartitionOperator.java   |   2 +-
 .../api/java/operators/ProjectOperator.java     |   9 +-
 .../api/java/operators/ReduceOperator.java      |   9 +-
 .../java/operators/SingleInputUdfOperator.java  | 136 ++++++++-
 .../api/java/operators/TwoInputUdfOperator.java | 139 ++++++++-
 .../flink/api/java/operators/UnionOperator.java |   8 +-
 .../api/java/typeutils/MissingTypeInfo.java     |  90 ++++++
 .../api/java/typeutils/TupleTypeInfoBase.java   |   2 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 253 +++++++++++-----
 .../api/java/typeutils/TypeInfoParser.java      | 104 ++++---
 .../translation/DistinctTranslationTest.java    | 296 +++++++++++++++++++
 .../translation/DistrinctTranslationTest.java   | 296 -------------------
 .../record/CoGroupWrappingFunctionTest.java     |   2 +-
 .../java/record/ReduceWrappingFunctionTest.java |   2 +-
 .../type/extractor/PojoTypeExtractionTest.java  |   6 +-
 .../java/type/extractor/TypeExtractorTest.java  |  82 +++--
 .../api/java/typeutils/TypeInfoParserTest.java  |  59 +++-
 .../java/type/lambdas/LambdaExtractionTest.java | 178 +++++------
 .../test/javaApiOperators/TypeHintITCase.java   | 164 ++++++++++
 .../api/scala/ScalaAPICompletenessTest.scala    |   4 +
 36 files changed, 1493 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 1ed86b3..827bc77 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -27,8 +27,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 /**
- * Wordcount for placing at least something into the jar file.
- * 
+ * WordCount for placing at least something into the jar file.
  */
 public class WordCount {
 	
@@ -98,7 +97,7 @@ public class WordCount {
 	// *************************************************************************
 	
 	private static boolean fileOutput = false;
-	private static boolean verbose = false;
+	
 	private static String textPath;
 	private static String outputPath;
 	
@@ -111,7 +110,7 @@ public class WordCount {
 				textPath = args[0];
 				outputPath = args[1];
 			} else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
-				verbose = Boolean.valueOf(args[1]);
+				Boolean.valueOf(args[1]); // parse verbosity flag
 				textPath = args[2];
 				outputPath = args[3];
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
index 26ed788..e9aa358 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
@@ -56,8 +56,7 @@ public class DualInputSemanticProperties extends SemanticProperties {
 
 	
 	public DualInputSemanticProperties() {
-		super();
-		this.init();
+		init();
 	}
 	
 	/**
@@ -251,15 +250,24 @@ public class DualInputSemanticProperties extends SemanticProperties {
 	 */
 	@Override
 	public void clearProperties() {
-		this.init();
 		super.clearProperties();
+		init();
 	}
 	
+	@Override
+	public boolean isEmpty() {
+		return super.isEmpty() &&
+				(forwardedFields1 == null || forwardedFields1.isEmpty()) &&
+				(forwardedFields2 == null || forwardedFields2.isEmpty()) &&
+				(readFields1 == null || readFields1.size() == 0) &&
+				(readFields2 == null || readFields2.size() == 0);
+	}
+	
+	
 	private void init() {
 		this.forwardedFields1 = new HashMap<Integer,FieldSet>();
 		this.forwardedFields2 = new HashMap<Integer,FieldSet>();
 		this.readFields1 = null;
 		this.readFields2 = null;
 	}
-		
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
index 7e1054d..ba801ec 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
@@ -30,11 +30,10 @@ public abstract class SemanticProperties implements Serializable {
 	
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * Set of fields that are written in the destination record(s).
-	 */
+	/** Set of fields that are written in the destination record(s).*/
 	private FieldSet writtenFields;
 	
+	
 	/**
 	 * Adds, to the existing information, field(s) that are written in
 	 * the destination record(s).
@@ -71,10 +70,10 @@ public abstract class SemanticProperties implements Serializable {
 	 * Clears the object.
 	 */
 	public void clearProperties() {
-		this.init();
+		this.writtenFields = null;
 	}
 	
-	private void init() {
-		this.writtenFields = null;
+	public boolean isEmpty() {
+		return this.writtenFields == null || this.writtenFields.size() == 0;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index 45f020a..77ed1bc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -27,23 +27,18 @@ import org.apache.flink.api.common.operators.util.FieldSet;
  * Container for the semantic properties associated to a single input operator.
  */
 public class SingleInputSemanticProperties extends SemanticProperties {
+	
 	private static final long serialVersionUID = 1L;
 	
-	/**
-	 * Mapping from fields in the source record(s) to fields in the destination
-	 * record(s).  
-	 */
+	/**Mapping from fields in the source record(s) to fields in the destination record(s). */
 	private Map<Integer,FieldSet> forwardedFields;
 	
-	/**
-	 * Set of fields that are read in the source record(s).
-	 */
+	/** Set of fields that are read in the source record(s).*/
 	private FieldSet readFields;
 
 	
 	public SingleInputSemanticProperties() {
-		super();
-		this.init();
+		init();
 	}
 	
 	/**
@@ -140,8 +135,15 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 	 */
 	@Override
 	public void clearProperties() {
-		this.init();
 		super.clearProperties();
+		init();
+	}
+	
+	@Override
+	public boolean isEmpty() {
+		return super.isEmpty() &&
+				(forwardedFields == null || forwardedFields.isEmpty()) &&
+				(readFields == null || readFields.size() == 0);
 	}
 	
 	private void init() {
@@ -206,5 +208,10 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 		public void setWrittenFields(FieldSet writtenFields) {
 			throw new UnsupportedOperationException();
 		}
+		
+		@Override
+		public boolean isEmpty() {
+			return false;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 6415570..edd49e2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -68,6 +69,7 @@ import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
@@ -91,19 +93,22 @@ public abstract class DataSet<T> {
 	
 	private final ExecutionEnvironment context;
 	
-	private final TypeInformation<T> type;
-
-	protected DataSet(ExecutionEnvironment context, TypeInformation<T> type) {
+	// NOTE: the type must not be accessed directly, but only via getType()
+	private TypeInformation<T> type;
+	
+	private boolean typeUsed = false;
+	
+	
+	protected DataSet(ExecutionEnvironment context, TypeInformation<T> typeInfo) {
 		if (context == null) {
 			throw new NullPointerException("context is null");
 		}
-
-		if (type == null) {
-			throw new NullPointerException("type is null");
+		if (typeInfo == null) {
+			throw new NullPointerException("typeInfo is null");
 		}
-		
+
 		this.context = context;
-		this.type = type;
+		this.type = typeInfo;
 	}
 
 	/**
@@ -117,6 +122,29 @@ public abstract class DataSet<T> {
 		return this.context;
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	//  Type Information handling
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Tries to fill in the type information. Type information can be filled in later when the program uses
+	 * a type hint. This method checks whether the type information has ever been accessed before and does not
+	 * allow modifications if the type was accessed already. This ensures consistency by making sure different
+	 * parts of the operation do not assume different type information.
+	 *   
+	 * @param typeInfo The type information to fill in.
+	 * 
+	 * @throws IllegalStateException Thrown, if the type information has been accessed before.
+	 */
+	protected void fillInType(TypeInformation<T> typeInfo) {
+		if (typeUsed) {
+			throw new IllegalStateException("TypeInformation cannot be filled in for the type after it has been used. "
+					+ "Please make sure that the type info hints are the first call after the transformation function, "
+					+ "before any access to types or semantic properties, etc.");
+		}
+		this.type = typeInfo;
+	}
+	
 	/**
 	 * Returns the {@link TypeInformation} for the type of this DataSet.
 	 * 
@@ -125,6 +153,15 @@ public abstract class DataSet<T> {
 	 * @see TypeInformation
 	 */
 	public TypeInformation<T> getType() {
+		if (type instanceof MissingTypeInfo) {
+			MissingTypeInfo typeInfo = (MissingTypeInfo) type;
+			throw new InvalidTypesException("The return type of function '" + typeInfo.getFunctionName()
+					+ "' could not be determined automatically, due to type erasure. "
+					+ "You can give type information hints by using the returns(...) method on the result of "
+					+ "the transformation call, or by letting your function implement the 'ResultTypeQueryable' "
+					+ "interface.", typeInfo.getTypeException());
+		}
+		typeUsed = true;
 		return this.type;
 	}
 
@@ -141,25 +178,25 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Applies a Map transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichMapFunction} for each element of the DataSet.
+	 * Applies a Map transformation on this DataSet.<br/>
+	 * The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet.
 	 * Each MapFunction call returns exactly one element.
 	 * 
 	 * @param mapper The MapFunction that is called for each element of the DataSet.
 	 * @return A MapOperator that represents the transformed DataSet.
 	 * 
+	 * @see org.apache.flink.api.common.functions.MapFunction
 	 * @see org.apache.flink.api.common.functions.RichMapFunction
 	 * @see MapOperator
-	 * @see DataSet
 	 */
 	public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
 		if (mapper == null) {
 			throw new NullPointerException("Map function must not be null.");
 		}
 
-		TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());
-
-		return new MapOperator<T, R>(this, resultType, clean(mapper), Utils.getCallLocationName());
+		String callLocation = Utils.getCallLocationName();
+		TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);
+		return new MapOperator<T, R>(this, resultType, clean(mapper), callLocation);
 	}
 
 
@@ -180,14 +217,15 @@ public abstract class DataSet<T> {
 	 *
 	 * @see MapPartitionFunction
 	 * @see MapPartitionOperator
-	 * @see DataSet
 	 */
 	public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition ){
 		if (mapPartition == null) {
 			throw new NullPointerException("MapPartition function must not be null.");
 		}
-		TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType());
-		return new MapPartitionOperator<T, R>(this, resultType, clean(mapPartition), Utils.getCallLocationName());
+		
+		String callLocation = Utils.getCallLocationName();
+		TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, getType(), callLocation, true);
+		return new MapPartitionOperator<T, R>(this, resultType, clean(mapPartition), callLocation);
 	}
 	
 	/**
@@ -207,8 +245,9 @@ public abstract class DataSet<T> {
 			throw new NullPointerException("FlatMap function must not be null.");
 		}
 
-		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
-		return new FlatMapOperator<T, R>(this, resultType, clean(flatMapper), Utils.getCallLocationName());
+		String callLocation = Utils.getCallLocationName();
+		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
+		return new FlatMapOperator<T, R>(this, resultType, clean(flatMapper), callLocation);
 	}
 	
 	/**
@@ -288,7 +327,7 @@ public abstract class DataSet<T> {
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
 	public AggregateOperator<T> sum (int field) {
-		return this.aggregate (Aggregations.SUM, field);
+		return aggregate(Aggregations.SUM, field);
 	}
 
 	/**
@@ -362,8 +401,10 @@ public abstract class DataSet<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
-		return new GroupReduceOperator<T, R>(this, resultType, clean(reducer), Utils.getCallLocationName());
+		
+		String callLocation = Utils.getCallLocationName();
+		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true);
+		return new GroupReduceOperator<T, R>(this, resultType, clean(reducer), callLocation);
 	}
 
 	/**
@@ -394,12 +435,12 @@ public abstract class DataSet<T> {
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public ReduceOperator<T> minBy(int... fields)  {
-		if(!type.isTupleType()) {
+		if(!getType().isTupleType()) {
 			throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types.");
 		}
 
 		return new ReduceOperator<T>(this, new SelectByMinFunction(
-				(TupleTypeInfo) type, fields), Utils.getCallLocationName());
+				(TupleTypeInfo) getType(), fields), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -430,12 +471,12 @@ public abstract class DataSet<T> {
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public ReduceOperator<T> maxBy(int... fields)  {
-		if(!type.isTupleType()) {
+		if(!getType().isTupleType()) {
 			throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types.");
 		}
 
 		return new ReduceOperator<T>(this, new SelectByMaxFunction(
-				(TupleTypeInfo) type, fields), Utils.getCallLocationName());
+				(TupleTypeInfo) getType(), fields), Utils.getCallLocationName());
 	}
 
 	/**
@@ -466,7 +507,7 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
-		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
 		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType), Utils.getCallLocationName());
 	}
 	
@@ -538,7 +579,7 @@ public abstract class DataSet<T> {
 	 * @see DataSet
 	 */
 	public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) {
-		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
 		return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(clean(keyExtractor), getType(), keyType));
 	}
 	
@@ -976,7 +1017,7 @@ public abstract class DataSet<T> {
 	 * @see KeySelector
 	 */
 	public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
-		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
 		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName());
 	}
 	
@@ -1023,8 +1064,8 @@ public abstract class DataSet<T> {
 	 * @see KeySelector
 	 */
 	public <K extends Comparable<K>> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keyExtractor) {
-		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-		return new PartitionOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType), clean(partitioner), Utils.getCallLocationName());
+		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
+		return new PartitionOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType), clean(partitioner), Utils.getCallLocationName());
 	}
 	
 	/**
@@ -1076,7 +1117,7 @@ public abstract class DataSet<T> {
 		return output(tof);
 	}
 	
-/**
+	/**
 	 * Writes a DataSet as a text file to the specified location.<br/>
 	 * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
 	 *
@@ -1175,7 +1216,7 @@ public abstract class DataSet<T> {
 	
 	@SuppressWarnings("unchecked")
 	private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
-		Validate.isTrue(this.type.isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
+		Validate.isTrue(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(filePath, rowDelimiter, fieldDelimiter);
 		if(wm != null) {
 			of.setWriteMode(wm);
@@ -1258,10 +1299,10 @@ public abstract class DataSet<T> {
 		
 		// configure the type if needed
 		if (outputFormat instanceof InputTypeConfigurable) {
-			((InputTypeConfigurable) outputFormat).setInputType(this.type);
+			((InputTypeConfigurable) outputFormat).setInputType(getType());
 		}
 		
-		DataSink<T> sink = new DataSink<T>(this, outputFormat, this.type);
+		DataSink<T> sink = new DataSink<T>(this, outputFormat, getType());
 		this.context.registerDataSink(sink);
 		return sink;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index edb1c74..25875a0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -1,5 +1,4 @@
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index fed402f..b33139b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -122,8 +122,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 		this.keys1 = keys1;
 		this.keys2 = keys2;
-		
-		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+	
+	@Override
+	protected CoGroupFunction<I1, I2, OUT> getFunction() {
+		return function;
 	}
 
 	protected Keys<I1> getKeys1() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 078d91e..e2d225c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -51,6 +51,7 @@ import com.google.common.base.Preconditions;
 public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, CrossOperator<I1, I2, OUT>> {
 
 	private final CrossFunction<I1, I2, OUT> function;
+	
 	private final String defaultName;
 
 	public CrossOperator(DataSet<I1> input1, DataSet<I2> input2,
@@ -62,17 +63,15 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 
 		this.function = function;
 		this.defaultName = defaultName;
-
-		if (!(function instanceof ProjectCrossFunction)) {
-			extractSemanticAnnotationsFromUdf(function.getClass());
-		} else {
-			generateProjectionProperties(((ProjectCrossFunction<?, ?, ?>) function));
-		}
 	}
-
-	public void generateProjectionProperties(ProjectCrossFunction<?, ?, ?> pcf) {
-		DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pcf.getFields(), pcf.getIsFromFirst());
-		setSemanticProperties(props);
+	
+	@Override
+	protected CrossFunction<I1, I2, OUT> getFunction() {
+		return function;
+	}
+	
+	private String getDefaultName() {
+		return defaultName;
 	}
 
 	@Override
@@ -112,7 +111,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		private final DataSet<I2> input2;
 
 		public DefaultCross(DataSet<I1> input1, DataSet<I2> input2, String defaultName) {
-			super(input1, input2, (CrossFunction<I1, I2, Tuple2<I1, I2>>) new DefaultCrossFunction<I1, I2>(),
+			super(input1, input2, new DefaultCrossFunction<I1, I2>(),
 					new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), defaultName);
 
 			if (input1 == null || input2 == null) {
@@ -137,7 +136,8 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			if (function == null) {
 				throw new NullPointerException("Cross function must not be null.");
 			}
-			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
+			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType(),
+					super.getDefaultName(), true);
 			return new CrossOperator<I1, I2, R>(input1, input2, clean(function), returnType, Utils.getCallLocationName());
 		}
 		
@@ -221,6 +221,11 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			this.crossProjection = crossProjection;
 		}
 
+		@Override
+		protected ProjectCrossFunction<I1, I2, OUT> getFunction() {
+			return (ProjectCrossFunction<I1, I2, OUT>) super.getFunction();
+		}
+		
 		/**
 		 * Continues a ProjectCross transformation and adds fields of the first cross input to the projection.<br/>
 		 * If the first cross input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
@@ -277,10 +282,6 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 
 		/**
 		 * Deprecated method only kept for compatibility.
-		 *
-		 * @param types
-		 *
-		 * @return
 		 */
 		@SuppressWarnings({ "hiding", "unchecked" })
 		@Deprecated
@@ -308,6 +309,12 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		public CrossOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) {
 			throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated.");
 		}
+		
+		@Override
+		protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
+			// we do not extract anything, but construct the properties from the projection
+			return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst());
+		}
 	}
 
 	public static final class ProjectCrossFunction<T1, T2, R extends Tuple> implements CrossFunction<T1, T2, R> {
@@ -597,11 +604,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 
 		/**
-		 * Chooses a projectTupleX according to the length of {@link CrossProjection#fieldIndexes} 
+		 * Chooses a projectTupleX according to the length of {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#fieldIndexes} 
 		 * 
 		 * @return The projected DataSet.
-		 * 
-		 * @see ProjectCross
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectTupleX() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index fa2d4d6..d30dc5e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -57,8 +57,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		// if keys is null distinction is done on all tuple fields
 		if (keys == null) {
 			if (input.getType() instanceof CompositeType) {
-
-				CompositeType<?> cType = (CompositeType<?>) input.getType();
 				keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index 1d93b0a..56bea50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -41,7 +41,11 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
 		
 		this.function = function;
 		this.defaultName = defaultName;
-		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+	
+	@Override
+	protected FilterFunction<T> getFunction() {
+		return function;
 	}
 	
 	@Override
@@ -51,17 +55,17 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
 		
 		// create operator
 		PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
-		// set input
 		po.setInput(input);
+		
 		// set dop
-		if(this.getParallelism() > 0) {
+		if (getParallelism() > 0) {
 			// use specified dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			po.setDegreeOfParallelism(getParallelism());
 		} else {
 			// if no dop has been specified, use dop of input operator to enable chaining
 			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
 		}
-				
+		
 		return po;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 73d6095..47446dd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -43,7 +43,11 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 		
 		this.function = function;
 		this.defaultName = defaultName;
-		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+	
+	@Override
+	protected FlatMapFunction<IN, OUT> getFunction() {
+		return function;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index bef91ed..0e2fb5e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -82,8 +82,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		this.defaultName = defaultName;
 
 		checkCombinability();
-
-		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 
 	private void checkCombinability() {
@@ -92,6 +90,12 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			this.combinable = true;
 		}
 	}
+	
+	
+	@Override
+	protected GroupReduceFunction<IN, OUT> getFunction() {
+		return function;
+	}
 
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 88bd273..822759f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.java.operators;
 import java.security.InvalidParameterException;
 import java.util.Arrays;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -55,8 +57,6 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.tuple.*;
 //CHECKSTYLE.ON: AvoidStarImport
 
-import com.google.common.base.Preconditions;
-
 /**
  * A {@link DataSet} that is the result of a Join transformation. 
  * 
@@ -200,12 +200,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			this.function = function;
 			this.joinLocationName = joinLocationName;
-
-			if (!(function instanceof ProjectFlatJoinFunction)) {
-				extractSemanticAnnotationsFromUdf(function.getClass());
-			} else {
-				generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) function));
-			}
 		}
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
@@ -221,37 +215,21 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 
 			this.function = generatedFunction;
+		}
+		
+		@Override
+		protected FlatJoinFunction<I1, I2, OUT> getFunction() {
+			return function;
+		}
 
-			if (!(generatedFunction instanceof ProjectFlatJoinFunction)) {
-				extractSemanticAnnotationsFromUdf(function.getClass());
+		@Override
+		protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
+			if (function instanceof DefaultJoin.WrappingFlatJoinFunction) {
+				return super.extractSemanticAnnotationsFromUdf(((WrappingFunction<?>) function).getWrappedFunction().getClass());
 			} else {
-				generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) generatedFunction));
+				return super.extractSemanticAnnotationsFromUdf(function.getClass());
 			}
 		}
-
-		public void generateProjectionProperties(ProjectFlatJoinFunction<?, ?, ?> pjf) {
-			DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst());
-			setSemanticProperties(props);
-		}
-
-		// TODO
-//		public EquiJoin<I1, I2, OUT> leftOuter() {
-//			this.preserve1 = true;
-//			return this;
-//		}
-
-		// TODO
-//		public EquiJoin<I1, I2, OUT> rightOuter() {
-//			this.preserve2 = true;
-//			return this;
-//		}
-		
-		// TODO
-//		public EquiJoin<I1, I2, OUT> fullOuter() {
-//			this.preserve1 = true;
-//			this.preserve2 = true;
-//			return this;
-//		}
 		
 		@Override
 		protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
@@ -647,6 +625,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			this.joinProj = joinProj;
 		}
+		
+		@Override
+		protected ProjectFlatJoinFunction<I1, I2, OUT> getFunction() {
+			return (ProjectFlatJoinFunction<I1, I2, OUT>) super.getFunction();
+		}
 
 		/**
 		 * Continues a ProjectJoin transformation and adds fields of the first join input to the projection.<br/>
@@ -706,8 +689,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * Deprecated method only kept for compatibility.
 		 *
 		 * @param types
-		 *
-		 * @return
 		 */
 		@SuppressWarnings({ "unchecked", "hiding" })
 		@Deprecated
@@ -735,6 +716,13 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		public JoinOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) {
 			throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated.");
 		}
+		
+		@Override
+		protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
+			// we do not extract the annotation, we construct the properties from the projection#
+			return SemanticPropUtil.createProjectionPropertiesDual(getFunction().getFields(), getFunction().getIsFromFirst());
+		}
+
 	}
 	
 //	@SuppressWarnings("unused")
@@ -1282,11 +1270,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 
 		/**
-		 * Chooses a projectTupleX according to the length of {@link JoinProjection#fieldIndexes} 
+		 * Chooses a projectTupleX according to the length of
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#fieldIndexes} 
 		 * 
 		 * @return The projected DataSet.
 		 * 
-		 * @see ProjectJoin
+		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectTupleX() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index c2a2a8e..5fca38a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -76,6 +76,9 @@ public abstract class Keys<T> {
 			if (keyExtractor == null) {
 				throw new NullPointerException("Key extractor must not be null.");
 			}
+			if (keyType == null) {
+				throw new NullPointerException("Key type must not be null.");
+			}
 
 			this.keyExtractor = keyExtractor;
 			this.keyType = keyType;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index b4433dc..9e96c64 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -45,7 +45,11 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 		
 		this.defaultName = defaultName;
 		this.function = function;
-		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+	
+	@Override
+	protected MapFunction<IN, OUT> getFunction() {
+		return function;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 067f7af..a6c69c1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -45,7 +45,11 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		
 		this.function = function;
 		this.defaultName = defaultName;
-		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+	
+	@Override
+	protected MapPartitionFunction<IN, OUT> getFunction() {
+		return function;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index 22d4d44..edb5a68 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
  *
  * @param <T> The type of the data being partitioned.
  */
-public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, PartitionOperator<T>> {
+public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOperator<T>> {
 	
 	private final Keys<T> pKeys;
 	private final PartitionMethod pMethod;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index daf129a..0b2aa95 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -101,10 +101,6 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	}
 	/**
 	 * Deprecated method only kept for compatibility.
-	 *
-	 * @param types
-	 *
-	 * @return
 	 */
 	@SuppressWarnings({ "unchecked", "hiding" })
 	@Deprecated
@@ -180,11 +176,12 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 
 		/**
-		 * Chooses a projectTupleX according to the length of {@link Projection#fieldIndexes} 
+		 * Chooses a projectTupleX according to the length of
+		 * {@link org.apache.flink.api.java.operators.ProjectOperator.Projection#fieldIndexes} 
 		 * 
 		 * @return The projected DataSet.
 		 * 
-		 * @see Projection
+		 * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 02b0ede..d1ad4c3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -61,8 +61,6 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		this.function = function;
 		this.grouper = null;
 		this.defaultName = defaultName;
-		
-		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
 	
@@ -72,8 +70,11 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		this.function = function;
 		this.grouper = input;
 		this.defaultName = defaultName;
-		
-		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+	
+	@Override
+	protected ReduceFunction<IN> getFunction() {
+		return function;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index 0823c15..0d0cb15 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -24,12 +24,16 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.DataSet;
 
 /**
  * The <tt>SingleInputUdfOperator</tt> is the base class of all unary operators that execute
@@ -65,11 +69,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	}
 	
 	
-	protected void extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
-		Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(udfClass);
-		SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());
-		setSemanticProperties(sp);
-	}
+	protected abstract Function getFunction();
 
 	// --------------------------------------------------------------------------------------------
 	// Fluent API methods
@@ -130,6 +130,120 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 		O returnType = (O) this;
 		return returnType;
 	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a type information string that will be parsed. A type information string can contain the following
+	 * types:
+	 *
+	 * <ul>
+	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
+	 * <li>Basic type arrays such as <code>Integer[]</code>,
+	 * <code>String[]</code>, etc.
+	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
+	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
+	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
+	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
+	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
+	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
+	 * <li>Value types such as <code>DoubleValue</code>,
+	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
+	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
+	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
+	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
+	 * </ul>
+	 *
+	 * Example:
+	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
+	 *
+	 * @param typeInfoString
+	 *            type information string to be parsed
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(String typeInfoString) {
+		if (typeInfoString == null) {
+			throw new IllegalArgumentException("Type information string must not be null.");
+		}
+		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
+	 * 
+	 * <ul>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
+	 * <li>etc.</li>
+	 * </ul>
+	 *
+	 * @param typeInfo
+	 *            type information as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(TypeInformation<OUT> typeInfo) {
+		if (typeInfo == null) {
+			throw new IllegalArgumentException("Type information must not be null.");
+		}
+		fillInType(typeInfo);
+		@SuppressWarnings("unchecked")
+		O returnType = (O) this;
+		return returnType;
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a class that will be analyzed by Flink's type extraction capabilities.
+	 * 
+	 * <p>
+	 * Examples for classes are:
+	 * <ul>
+	 * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
+	 * <li>POJOs such as <code>MyPojo.class</code></li>
+	 * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
+	 * <li>Arrays such as <code>String[].class</code>, etc.</li>
+	 * </ul>
+	 *
+	 * @param typeClass
+	 *            class as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	@SuppressWarnings("unchecked")
+	public O returns(Class<OUT> typeClass) {
+		if (typeClass == null) {
+			throw new IllegalArgumentException("Type class must not be null.");
+		}
+		
+		try {
+			TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass);
+			return returns(ti);
+		}
+		catch (InvalidTypesException e) {
+			throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
+		}
+	}
 
 	// --------------------------------------------------------------------------------------------
 	// Accessors
@@ -149,6 +263,11 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 
 	@Override
 	public SingleInputSemanticProperties getSemanticProperties() {
+		if (udfSemantics == null) {
+			SingleInputSemanticProperties props = extractSemanticAnnotations(getFunction().getClass());
+			udfSemantics = props != null ? props : new SingleInputSemanticProperties();
+		}
+		
 		return this.udfSemantics;
 	}
 
@@ -163,4 +282,9 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	public void setSemanticProperties(SingleInputSemanticProperties properties) {
 		this.udfSemantics = properties;
 	}
+	
+	protected SingleInputSemanticProperties extractSemanticAnnotations(Class<?> udfClass) {
+		Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(udfClass);
+		return SemanticPropUtil.getSemanticPropsSingle(annotations, getInputType(), getResultType());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 7ca0840..2de9282 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -24,10 +24,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.java.DataSet;
 
@@ -67,14 +71,7 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 		super(input1, input2, resultType);
 	}
 	
-	protected void extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
-		Set<Annotation> annotations = FunctionAnnotation.readDualConstantAnnotations(udfClass);
-		
-		DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDual(annotations,
-					getInput1Type(), getInput2Type(), getResultType());
-
-		setSemanticProperties(dsp);
-	}
+	protected abstract Function getFunction();
 
 	// --------------------------------------------------------------------------------------------
 	// Fluent API methods
@@ -172,6 +169,121 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 		O returnType = (O) this;
 		return returnType;
 	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a type information string that will be parsed. A type information string can contain the following
+	 * types:
+	 *
+	 * <ul>
+	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
+	 * <li>Basic type arrays such as <code>Integer[]</code>,
+	 * <code>String[]</code>, etc.
+	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
+	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
+	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
+	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
+	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
+	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
+	 * <li>Value types such as <code>DoubleValue</code>,
+	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
+	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
+	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
+	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
+	 * </ul>
+	 *
+	 * Example:
+	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
+	 *
+	 * @param typeInfoString
+	 *            type information string to be parsed
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(String typeInfoString) {
+		if (typeInfoString == null) {
+			throw new IllegalArgumentException("Type information string must not be null.");
+		}
+		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
+	 * 
+	 * <ul>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
+	 * <li>etc.</li>
+	 * </ul>
+	 *
+	 * @param typeInfo
+	 *            type information as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(TypeInformation<OUT> typeInfo) {
+		if (typeInfo == null) {
+			throw new IllegalArgumentException("Type information must not be null.");
+		}
+		fillInType(typeInfo);
+		
+		@SuppressWarnings("unchecked")
+		O returnType = (O) this;
+		return returnType;
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a class that will be analyzed by Flink's type extraction capabilities.
+	 * 
+	 * <p>
+	 * Examples for classes are:
+	 * <ul>
+	 * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
+	 * <li>POJOs such as <code>MyPojo.class</code></li>
+	 * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
+	 * <li>Arrays such as <code>String[].class</code>, etc.</li>
+	 * </ul>
+	 *
+	 * @param typeClass
+	 *            class as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	@SuppressWarnings("unchecked")
+	public O returns(Class<OUT> typeClass) {
+		if (typeClass == null) {
+			throw new IllegalArgumentException("Type class must not be null.");
+		}
+		
+		try {
+			TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass);
+			return returns(ti);
+		}
+		catch (InvalidTypesException e) {
+			throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
+		}
+	}
 
 	// --------------------------------------------------------------------------------------------
 	// Accessors
@@ -191,6 +303,11 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 
 	@Override
 	public DualInputSemanticProperties getSemanticProperties() {
+		if (udfSemantics == null) {
+			DualInputSemanticProperties props = extractSemanticAnnotationsFromUdf(getFunction().getClass());
+			udfSemantics = props != null ? props : new DualInputSemanticProperties();
+		}
+		
 		return this.udfSemantics;
 	}
 
@@ -205,4 +322,10 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	public void setSemanticProperties(DualInputSemanticProperties properties) {
 		this.udfSemantics = properties;
 	}
+	
+	
+	protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
+		Set<Annotation> annotations = FunctionAnnotation.readDualConstantAnnotations(udfClass);
+		return SemanticPropUtil.getSemanticPropsDual(annotations, getInput1Type(), getInput2Type(), getResultType());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index c6f72f2..001c46b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.java.DataSet;
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet;
 public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>> {
 
 	private final String unionLocationName;
+	
 	/**
 	 * Create an operator that produces the union of the two given data sets.
 	 * 
@@ -40,6 +41,11 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>
 	public UnionOperator(DataSet<T> input1, DataSet<T> input2, String unionLocationName) {
 		super(input1, input2, input1.getType());
 		
+		if (!input1.getType().equals(input2.getType())) {
+			throw new InvalidProgramException("Cannot union inputs of different types. Input1=" 
+					+ input1.getType() + ", input2=" + input2.getType());
+		}
+		
 		this.unionLocationName = unionLocationName;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
new file mode 100644
index 0000000..10ab02f
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A special type information signifying that the type extraction failed. It contains
+ * additional error information.
+ */
+public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
+
+	private String functionName;
+	private InvalidTypesException typeException;
+
+	
+	public MissingTypeInfo(String functionName) {
+		this(functionName, new InvalidTypesException("An unknown error occured."));
+	}
+
+	public MissingTypeInfo(String functionName, InvalidTypesException typeException) {
+		this.functionName = functionName;
+		this.typeException = typeException;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public String getFunctionName() {
+		return functionName;
+	}
+
+	public InvalidTypesException getTypeException() {
+		return typeException;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isBasicType() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public boolean isTupleType() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public int getArity() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public Class<InvalidTypesException> getTypeClass() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public boolean isKeyType() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public TypeSerializer<InvalidTypesException> createSerializer() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+
+	@Override
+	public int getTotalFields() {
+		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 4d8a81e..22f7942 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -75,7 +75,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 	/**
 	 * Recursively add all fields in this tuple type. We need this in particular to get all
 	 * the types.
-	 * @param keyId
+	 * @param startKeyId
 	 * @param keyFields
 	 */
 	public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8dbaeeb/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b528d00..edff09c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -59,7 +59,12 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
+ * functions.
+ */
 public class TypeExtractor {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
 
 	// We need this to detect recursive types and not get caught
@@ -75,108 +80,211 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	
 	public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
-		return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType);
+		return getMapReturnTypes(mapInterface, inType, null, false);
 	}
 	
+	public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
+			String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing);
+	}
+	
+
 	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
-		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType);
+		return getFlatMapReturnTypes(flatMapInterface, inType, null, false);
 	}
 	
+	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType,
+			String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
+	}
+	
+	
 	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {
-		return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType);
+		return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false);
 	}
 	
-	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
-			TypeInformation<IN> inType) {
-		return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType);
+	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType,
+			String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing);
 	}
 	
+	
+	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType) {
+		return getGroupReduceReturnTypes(groupReduceInterface, inType, null, false);
+	}
+	
+	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType,
+			String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
+	}
+	
+	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
-			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, in1Type, in2Type);
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
+	{
+		return getFlatJoinReturnTypes(joinInterface, in1Type, in2Type, null, false);
 	}
 	
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
+	{
+		return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true,
+				in1Type, in2Type, functionName, allowMissing);
+	}
+	
+	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
-			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, in1Type, in2Type);
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
+	{
+		return getJoinReturnTypes(joinInterface, in1Type, in2Type, null, false);
+	}
+	
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
+	{
+		return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false,
+				in1Type, in2Type, functionName, allowMissing);
 	}
 	
+	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
-			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, in1Type, in2Type);
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
+	{
+		return getCoGroupReturnTypes(coGroupInterface, in1Type, in2Type, null, false);
 	}
 	
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
+	{
+		return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true,
+				in1Type, in2Type, functionName, allowMissing);
+	}
+	
+	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
-			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, in1Type, in2Type);
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
+	{
+		return getCrossReturnTypes(crossInterface, in1Type, in2Type, null, false);
+	}
+	
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
+	{
+		return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false,
+				in1Type, in2Type, functionName, allowMissing);
 	}
 	
+	
 	public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) {
-		return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType);
+		return getKeySelectorTypes(selectorInterface, inType, null, false);
+	}
+	
+	public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
+			TypeInformation<IN> inType, String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing);
 	}
 	
+	
 	public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner) {
+		return getPartitionerTypes(partitioner, null, false);
+	}
+	
+	public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) {
 		return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null);
 	}
 	
+	
 	@SuppressWarnings("unchecked")
 	public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
-		if(inputFormatInterface instanceof ResultTypeQueryable) {
+		if (inputFormatInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<IN>) inputFormatInterface).getProducedType();
 		}
 		return new TypeExtractor().privateCreateTypeInfo(InputFormat.class, inputFormatInterface.getClass(), 0, null, null);
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	//  Generic extraction methods
+	// --------------------------------------------------------------------------------------------
+	
 	@SuppressWarnings("unchecked")
-	private static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass, boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType) {
-		final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
-		if (m != null) {
-			// check for lambda type erasure
-			validateLambdaGenericParameters(m);
-			
-			// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-			final int paramLen = m.getGenericParameterTypes().length - 1;
-			final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
-			validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType);
-			if(function instanceof ResultTypeQueryable) {
-				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+	private static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass, 
+			boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType,
+			String functionName, boolean allowMissing)
+	{
+		try {
+			final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
+			if (m != null) {
+				// check for lambda type erasure
+				validateLambdaGenericParameters(m);
+				
+				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+				final int paramLen = m.getGenericParameterTypes().length - 1;
+				final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+				validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType);
+				if(function instanceof ResultTypeQueryable) {
+					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				}
+				return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null);
+			}
+			else {
+				validateInputType(baseClass, function.getClass(), 0, inType);
+				if(function instanceof ResultTypeQueryable) {
+					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				}
+				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null);
 			}
-			return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), inType, null);
 		}
-		else {
-			validateInputType(baseClass, function.getClass(), 0, inType);
-			if(function instanceof ResultTypeQueryable) {
-				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+		catch (InvalidTypesException e) {
+			if (allowMissing) {
+				return (TypeInformation<OUT>) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e);
+			} else {
+				throw e;
 			}
-			return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null);
 		}
 	}
 	
 	@SuppressWarnings("unchecked")
-	private static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass, boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
-		if (m != null) {
-			// check for lambda type erasure
-			validateLambdaGenericParameters(m);
-			
-			// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-			final int paramLen = m.getGenericParameterTypes().length - 1;
-			final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1];
-			final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
-			validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type);
-			validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type);
-			if(function instanceof ResultTypeQueryable) {
-				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+	private static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
+			boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type,
+			String functionName, boolean allowMissing)
+	{
+		try {
+			final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
+			if (m != null) {
+				// check for lambda type erasure
+				validateLambdaGenericParameters(m);
+				
+				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+				final int paramLen = m.getGenericParameterTypes().length - 1;
+				final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1];
+				final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+				validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type);
+				validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type);
+				if(function instanceof ResultTypeQueryable) {
+					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				}
+				return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), in1Type, in2Type);
+			}
+			else {
+				validateInputType(baseClass, function.getClass(), 0, in1Type);
+				validateInputType(baseClass, function.getClass(), 1, in2Type);
+				if(function instanceof ResultTypeQueryable) {
+					return ((ResultTypeQueryable<OUT>) function).getProducedType();
+				}
+				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type);
 			}
-			return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen]) : m.getGenericReturnType(), in1Type, in2Type);
 		}
-		else {
-			validateInputType(baseClass, function.getClass(), 0, in1Type);
-			validateInputType(baseClass, function.getClass(), 1, in2Type);
-			if(function instanceof ResultTypeQueryable) {
-				return ((ResultTypeQueryable<OUT>) function).getProducedType();
+		catch (InvalidTypesException e) {
+			if (allowMissing) {
+				return (TypeInformation<OUT>) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e);
+			} else {
+				throw e;
 			}
-			return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type);
 		}
 	}
 	
@@ -185,12 +293,20 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	
 	public static TypeInformation<?> createTypeInfo(Type t) {
-		return new TypeExtractor().privateCreateTypeInfo(t);
+		TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t);
+		if (ti == null) {
+			throw new InvalidTypesException("Could not extract type information.");
+		}
+		return ti;
 	}
 	
 	public static <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		return new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);
+		TypeInformation<OUT> ti =  new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);
+		if (ti == null) {
+			throw new InvalidTypesException("Could not extract type information.");
+		}
+		return ti;
 	}
 	
 	// ----------------------------------- private methods ----------------------------------------
@@ -550,7 +666,6 @@ public class TypeExtractor {
 	
 	@SuppressWarnings("unchecked")
 	private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) {
-		
 		if (type == null) {
 			throw new InvalidTypesException("Unknown Error. Type is null.");
 		}
@@ -784,29 +899,28 @@ public class TypeExtractor {
 	}
 	
 	private static String encodePrimitiveClass(Class<?> primitiveClass) {
-		final String name = primitiveClass.getName();
-		if (name.equals("boolean")) {
+		if (primitiveClass == boolean.class) {
 			return "Z";
 		}
-		else if (name.equals("byte")) {
+		else if (primitiveClass == byte.class) {
 			return "B";
 		}
-		else if (name.equals("char")) {
+		else if (primitiveClass == char.class) {
 			return "C";
 		}
-		else if (name.equals("double")) {
+		else if (primitiveClass == double.class) {
 			return "D";
 		}
-		else if (name.equals("float")) {
+		else if (primitiveClass == float.class) {
 			return "F";
 		}
-		else if (name.equals("int")) {
+		else if (primitiveClass == int.class) {
 			return "I";
 		}
-		else if (name.equals("long")) {
+		else if (primitiveClass == long.class) {
 			return "J";
 		}
-		else if (name.equals("short")) {
+		else if (primitiveClass == short.class) {
 			return "S";
 		}
 		throw new InvalidTypesException();
@@ -972,7 +1086,6 @@ public class TypeExtractor {
 	 * @param f field to check
 	 * @param clazz class of field
 	 * @param typeHierarchy type hierarchy for materializing generic types
-	 * @return
 	 */
 	private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) {
 		if(Modifier.isPublic(f.getModifiers())) {
@@ -1028,6 +1141,7 @@ public class TypeExtractor {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
 		// try to create Type hierarchy, if the incoming only contains the most bottom one or none.
 		if(typeHierarchy.size() <= 1) {
@@ -1054,7 +1168,7 @@ public class TypeExtractor {
 				if(isClassType(fieldType)) {
 					genericClass = typeToClass(fieldType);
 				}
-				pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass )));
+				pojoFields.add(new PojoField(field, new GenericTypeInfo<X>( (Class<X>) genericClass )));
 			}
 		}
 
@@ -1117,7 +1231,6 @@ public class TypeExtractor {
 		}
 		return false;
 	}
-	
 
 	
 	// recursively determine all declared methods


Mime
View raw message