flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [5/5] incubator-flink git commit: [FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas
Date Wed, 10 Dec 2014 14:30:46 GMT
[FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/51c1f677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/51c1f677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/51c1f677

Branch: refs/heads/master
Commit: 51c1f67791307c2b9355171f7398d104befc8de5
Parents: 94c8e3f
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Dec 8 17:12:01 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  85 +++--
 .../flink/streaming/api/StreamConfig.java       |  93 +++---
 .../api/datastream/ConnectedDataStream.java     | 119 +++----
 .../streaming/api/datastream/DataStream.java    | 130 ++++----
 .../api/datastream/DataStreamSink.java          |  85 ++---
 .../api/datastream/DataStreamSource.java        |   8 +-
 .../api/datastream/GroupedDataStream.java       |   8 +-
 .../datastream/SingleOutputStreamOperator.java  |   6 +-
 .../api/datastream/SplitDataStream.java         |   2 +-
 .../api/datastream/StreamJoinOperator.java      |   8 +-
 .../api/datastream/StreamProjection.java        | 307 ++++++++++---------
 .../api/datastream/WindowedDataStream.java      |  49 +--
 .../environment/StreamExecutionEnvironment.java |  56 ++--
 .../invokable/operator/ProjectInvokable.java    |  11 +-
 .../api/streamvertex/CoStreamVertex.java        |   8 +-
 .../api/streamvertex/InputHandler.java          |  11 +-
 .../api/streamvertex/OutputHandler.java         |   5 +-
 .../util/serialization/ClassTypeWrapper.java    |  46 ---
 .../util/serialization/CombineTypeWrapper.java  |  50 ---
 .../util/serialization/FunctionTypeWrapper.java |  53 ----
 .../util/serialization/ObjectTypeWrapper.java   |  47 ---
 .../util/serialization/ProjectTypeWrapper.java  |  70 -----
 .../util/serialization/TypeWrapper.java         |  38 ---
 .../api/invokable/operator/ProjectTest.java     |  19 +-
 .../serialization/TypeSerializationTest.java    |  72 -----
 25 files changed, 493 insertions(+), 893 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 8a8595a..c45164a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
@@ -40,7 +41,6 @@ import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
 import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,10 +66,10 @@ public class JobGraphBuilder {
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
 	private Map<String, StreamInvokable<?, ?>> invokableObjects;
-	private Map<String, TypeWrapper<?>> typeWrapperIn1;
-	private Map<String, TypeWrapper<?>> typeWrapperIn2;
-	private Map<String, TypeWrapper<?>> typeWrapperOut1;
-	private Map<String, TypeWrapper<?>> typeWrapperOut2;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
 	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
@@ -98,10 +98,10 @@ public class JobGraphBuilder {
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
 		invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
-		typeWrapperIn1 = new HashMap<String, TypeWrapper<?>>();
-		typeWrapperIn2 = new HashMap<String, TypeWrapper<?>>();
-		typeWrapperOut1 = new HashMap<String, TypeWrapper<?>>();
-		typeWrapperOut2 = new HashMap<String, TypeWrapper<?>>();
+		typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
 		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
 		vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
@@ -124,10 +124,10 @@ public class JobGraphBuilder {
 	 *            Name of the vertex
 	 * @param invokableObject
 	 *            User defined operator
-	 * @param inTypeWrapper
-	 *            Input type wrapper for serialization
-	 * @param outTypeWrapper
-	 *            Output type wrapper for serialization
+	 * @param inTypeInfo
+	 *            Input type for serialization
+	 * @param outTypeInfo
+	 *            Output typ for serialization
 	 * @param operatorName
 	 *            Operator type
 	 * @param serializedFunction
@@ -136,14 +136,19 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 */
 	public <IN, OUT> void addStreamVertex(String vertexName,
-			StreamInvokable<IN, OUT> invokableObject, TypeWrapper<?> inTypeWrapper,
-			TypeWrapper<?> outTypeWrapper, String operatorName, byte[] serializedFunction,
+			StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
+			TypeInformation<OUT> outTypeInfo, String operatorName, byte[] serializedFunction,
 			int parallelism) {
 
 		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
 				serializedFunction, parallelism);
 
-		addTypeWrappers(vertexName, inTypeWrapper, null, outTypeWrapper, null);
+		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+				inTypeInfo) : null;
+		StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+				outTypeInfo) : null;
+
+		addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Vertex: {}", vertexName);
@@ -224,14 +229,16 @@ public class JobGraphBuilder {
 	}
 
 	public <IN1, IN2, OUT> void addCoTask(String vertexName,
-			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeWrapper<?> in1TypeWrapper,
-			TypeWrapper<?> in2TypeWrapper, TypeWrapper<?> outTypeWrapper, String operatorName,
-			byte[] serializedFunction, int parallelism) {
+			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
+			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo,
+			String operatorName, byte[] serializedFunction, int parallelism) {
 
 		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
 				serializedFunction, parallelism);
 
-		addTypeWrappers(vertexName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
+		addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo),
+				new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>(
+						outTypeInfo), null);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: {}", vertexName);
@@ -273,12 +280,13 @@ public class JobGraphBuilder {
 		iterationTailCount.put(vertexName, 0);
 	}
 
-	private void addTypeWrappers(String vertexName, TypeWrapper<?> in1, TypeWrapper<?> in2,
-			TypeWrapper<?> out1, TypeWrapper<?> out2) {
-		typeWrapperIn1.put(vertexName, in1);
-		typeWrapperIn2.put(vertexName, in2);
-		typeWrapperOut1.put(vertexName, out1);
-		typeWrapperOut2.put(vertexName, out2);
+	private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
+			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1,
+			StreamRecordSerializer<?> out2) {
+		typeSerializersIn1.put(vertexName, in1);
+		typeSerializersIn2.put(vertexName, in2);
+		typeSerializersOut1.put(vertexName, out1);
+		typeSerializersOut2.put(vertexName, out2);
 	}
 
 	/**
@@ -315,10 +323,10 @@ public class JobGraphBuilder {
 		config.setMutability(mutability.get(vertexName));
 		config.setBufferTimeout(bufferTimeout.get(vertexName));
 
-		config.setTypeWrapperIn1(typeWrapperIn1.get(vertexName));
-		config.setTypeWrapperIn2(typeWrapperIn2.get(vertexName));
-		config.setTypeWrapperOut1(typeWrapperOut1.get(vertexName));
-		config.setTypeWrapperOut2(typeWrapperOut2.get(vertexName));
+		config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
+		config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
+		config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
+		config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
 
 		// Set vertex config
 		config.setUserInvokable(invokableObject);
@@ -482,19 +490,10 @@ public class JobGraphBuilder {
 		operatorNames.put(to, operatorNames.get(from));
 		serializedFunctions.put(to, serializedFunctions.get(from));
 
-		typeWrapperIn1.put(to, typeWrapperOut1.get(from));
-		typeWrapperIn2.put(to, typeWrapperOut2.get(from));
-		typeWrapperOut1.put(to, typeWrapperOut1.get(from));
-		typeWrapperOut2.put(to, typeWrapperOut2.get(from));
-	}
-
-	public TypeInformation<?> getInTypeInfo(String id) {
-		System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
-		return typeWrapperIn1.get(id).getTypeInfo();
-	}
-
-	public TypeInformation<?> getOutTypeInfo(String id) {
-		return typeWrapperOut1.get(id).getTypeInfo();
+		typeSerializersIn1.put(to, typeSerializersOut1.get(from));
+		typeSerializersIn2.put(to, typeSerializersOut2.get(from));
+		typeSerializersOut1.put(to, typeSerializersOut1.get(from));
+		typeSerializersOut2.put(to, typeSerializersOut2.get(from));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3dba376..31af9cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -25,15 +25,14 @@ import java.util.Map;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig {
@@ -54,6 +53,12 @@ public class StreamConfig {
 	private static final String USER_FUNCTION = "userfunction";
 	private static final String BUFFER_TIMEOUT = "bufferTimeout";
 	private static final String OPERATOR_STATES = "operatorStates";
+	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
+	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
+	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
+	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+	private static final String MUTABILITY = "isMutable";
+	private static final String ITERATON_WAIT = "iterationWait";
 
 	// DEFAULT VALUES
 
@@ -61,10 +66,7 @@ public class StreamConfig {
 
 	private static final long DEFAULT_TIMEOUT = 0;
 
-	// STRINGS
-
-	private static final String MUTABILITY = "isMutable";
-	private static final String ITERATON_WAIT = "iterationWait";
+	// CONFIG METHODS
 
 	private Configuration config;
 
@@ -76,65 +78,64 @@ public class StreamConfig {
 		return config;
 	}
 
-	// CONFIGS
-
-	private static final String TYPE_WRAPPER_IN_1 = "typeWrapper_in_1";
-	private static final String TYPE_WRAPPER_IN_2 = "typeWrapper_in_2";
-	private static final String TYPE_WRAPPER_OUT_1 = "typeWrapper_out_1";
-	private static final String TYPE_WRAPPER_OUT_2 = "typeWrapper_out_2";
-
-	public void setTypeWrapperIn1(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_IN_1, typeWrapper);
+	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
 	}
 
-	public void setTypeWrapperIn2(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_IN_2, typeWrapper);
+	public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
 	}
 
-	public void setTypeWrapperOut1(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_OUT_1, typeWrapper);
+	public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
 
-	public void setTypeWrapperOut2(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
+	public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
-	}
-
-	public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
-	}
-
-	public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
 	}
 
-	public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_2, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
 	}
 
-	private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
-		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
 	}
 
 	@SuppressWarnings("unchecked")
-	private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
-
-		TypeWrapper<T> typeWrapper;
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
 		try {
-			typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
-					cl);
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_2, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Cannot load typeinfo");
-		}
-		if (typeWrapper != null) {
-			return typeWrapper.getTypeInfo();
-		} else {
-			return null;
+			throw new RuntimeException("Could not instantiate serializer.");
 		}
+	}
 
+	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
 	}
 
 	public void setMutability(boolean isMutable) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 1621752..6336e68 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
@@ -44,10 +46,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.apache.flink.util.Collector;
 
 /**
@@ -122,7 +120,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The type of the first input
 	 */
 	public TypeInformation<IN1> getInputType1() {
-		return dataStream1.getOutputType();
+		return dataStream1.getType();
 	}
 
 	/**
@@ -131,7 +129,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The type of the second input
 	 */
 	public TypeInformation<IN2> getInputType2() {
-		return dataStream2.getOutputType();
+		return dataStream2.getType();
 	}
 
 	/**
@@ -403,15 +401,11 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The transformed {@link DataStream}
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
-				CoMapFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coMapper,
-				CoMapFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coMapper,
-				CoMapFunction.class, 2);
-
-		return addCoFunction("coMap", coMapper, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoMapInvokable<IN1, IN2, OUT>(coMapper));
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
+				coMapper.getClass(), 2, null, null);
+
+		return addCoFunction("coMap", coMapper, outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
+				coMapper));
 	}
 
 	/**
@@ -431,15 +425,11 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
 			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coFlatMapper,
-				CoFlatMapFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coFlatMapper,
-				CoFlatMapFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coFlatMapper,
-				CoFlatMapFunction.class, 2);
-
-		return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
+				coFlatMapper.getClass(), 2, null, null);
+
+		return addCoFunction("coFlatMap", coFlatMapper, outTypeInfo,
+				new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
 	}
 
 	/**
@@ -460,14 +450,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
-				CoReduceFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
-				CoReduceFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
-				CoReduceFunction.class, 2);
-		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				getReduceInvokable(coReducer));
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
+				coReducer.getClass(), 2, null, null);
+
+		return addCoFunction("coReduce", coReducer, outTypeInfo, getReduceInvokable(coReducer));
 	}
 
 	/**
@@ -528,16 +514,12 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coWindowFunction,
-				CoWindowFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coWindowFunction,
-				CoWindowFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coWindowFunction,
-				CoWindowFunction.class, 2);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
+				coWindowFunction.getClass(), 2, null, null);
 
-		return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
-						slideInterval, timestamp1, timestamp2));
+		return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
+				new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+						timestamp1, timestamp2));
 	}
 
 	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
@@ -556,26 +538,23 @@ public class ConnectedDataStream<IN1, IN2> {
 			CrossFunction<IN1, IN2, OUT> crossFunction, long windowSize, long slideInterval,
 			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
 
-		TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
-				.createSerializer().createInstance());
-		TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
-				.createSerializer().createInstance());
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CrossFunction.class,
+				crossFunction.getClass(), 2, null, null);
 
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(crossFunction,
-				CrossFunction.class, 2);
+		CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(
+				crossFunction);
 
-		CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(crossFunction);
-		
-		return addGeneralWindowCombine(crossWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
+		return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval,
+				timestamp1, timestamp2);
 	}
 
-	private static class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+	private static class CrossWindowFunction<IN1, IN2, OUT> implements
+			CoWindowFunction<IN1, IN2, OUT> {
 
 		private static final long serialVersionUID = 1L;
 
 		private CrossFunction<IN1, IN2, OUT> crossFunction;
-		
+
 		public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
 			this.crossFunction = crossFunction;
 		}
@@ -590,27 +569,22 @@ public class ConnectedDataStream<IN1, IN2> {
 			}
 		}
 	}
-	
+
 	protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
 			CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
 			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
 
-		TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
-				.createSerializer().createInstance());
-		TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
-				.createSerializer().createInstance());
-
-		CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
-				in1TypeWrapper, in2TypeWrapper);
+		TypeInformation<Tuple2<IN1, IN2>> outType = new TupleTypeInfo<Tuple2<IN1, IN2>>(
+				getInputType1(), getInputType2());
 
-		return addGeneralWindowCombine(coWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
+		return addGeneralWindowCombine(coWindowFunction, outType, windowSize, slideInterval,
+				timestamp1, timestamp2);
 	}
 
 	private <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
-			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeWrapper<IN1> in1TypeWrapper,
-			TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper, long windowSize,
-			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
+			long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
+			TimeStamp<IN2> timestamp2) {
 
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
@@ -619,23 +593,22 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
-						slideInterval, timestamp1, timestamp2));
+		return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
+				new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+						timestamp1, timestamp2));
 	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
-			final Function function, TypeWrapper<IN1> in1TypeWrapper,
-			TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper,
+			final Function function, TypeInformation<OUT> outTypeInfo,
 			CoInvokable<IN1, IN2, OUT> functionInvokable) {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
-				environment, functionName, outTypeWrapper);
+				environment, functionName, outTypeInfo);
 
 		try {
 			dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
-					in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
+					getInputType1(), getInputType2(), outTypeInfo, functionName,
 					SerializationUtils.serialize((Serializable) function),
 					environment.getDegreeOfParallelism());
 		} catch (SerializationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f0e4309..978f5fa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -70,9 +71,6 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
 import org.apache.flink.streaming.util.keys.PojoKeySelector;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -97,7 +95,7 @@ public class DataStream<OUT> {
 	protected List<String> userDefinedNames;
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
-	protected final TypeWrapper<OUT> outTypeWrapper;
+	protected final TypeInformation<OUT> typeInfo;
 	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -110,11 +108,11 @@ public class DataStream<OUT> {
 	 *            StreamExecutionEnvironment
 	 * @param operatorType
 	 *            The type of the operator in the component
-	 * @param outTypeWrapper
-	 *            Type of the output
+	 * @param typeInfo
+	 *            Type of the datastream
 	 */
 	public DataStream(StreamExecutionEnvironment environment, String operatorType,
-			TypeWrapper<OUT> outTypeWrapper) {
+			TypeInformation<OUT> typeInfo) {
 		if (environment == null) {
 			throw new NullPointerException("context is null");
 		}
@@ -127,7 +125,7 @@ public class DataStream<OUT> {
 		this.userDefinedNames = new ArrayList<String>();
 		this.selectAll = false;
 		this.partitioner = new DistributePartitioner<OUT>(true);
-		this.outTypeWrapper = outTypeWrapper;
+		this.typeInfo = typeInfo;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
 	}
@@ -146,7 +144,7 @@ public class DataStream<OUT> {
 		this.selectAll = dataStream.selectAll;
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
-		this.outTypeWrapper = dataStream.outTypeWrapper;
+		this.typeInfo = dataStream.typeInfo;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
 		if (dataStream.mergedStreams.size() > 1) {
@@ -176,12 +174,12 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Gets the output type.
+	 * Gets the type of the stream.
 	 * 
-	 * @return The output type.
+	 * @return The type of the datastream.
 	 */
-	public TypeInformation<OUT> getOutputType() {
-		return this.outTypeWrapper.getTypeInfo();
+	public TypeInformation<OUT> getType() {
+		return this.typeInfo;
 	}
 
 	/**
@@ -230,7 +228,7 @@ public class DataStream<OUT> {
 	 */
 	public GroupedDataStream<OUT> groupBy(int... fields) {
 
-		return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
+		return groupBy(FieldsKeySelector.getSelector(getType(), fields));
 
 	}
 
@@ -248,7 +246,7 @@ public class DataStream<OUT> {
 	 **/
 	public GroupedDataStream<OUT> groupBy(String... fields) {
 
-		return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
+		return groupBy(new PojoKeySelector<OUT>(getType(), fields));
 
 	}
 
@@ -277,7 +275,7 @@ public class DataStream<OUT> {
 	public DataStream<OUT> partitionBy(int... fields) {
 
 		return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
-				getOutputType(), fields)));
+				getType(), fields)));
 	}
 
 	/**
@@ -290,8 +288,8 @@ public class DataStream<OUT> {
 	 */
 	public DataStream<OUT> partitionBy(String... fields) {
 
-		return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(
-				getOutputType(), fields)));
+		return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(getType(),
+				fields)));
 	}
 
 	/**
@@ -387,13 +385,10 @@ public class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(mapper,
-				MapFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(mapper,
-				MapFunction.class, 1);
 
-		return addFunction("map", mapper, inTypeWrapper, outTypeWrapper, new MapInvokable<OUT, R>(
-				mapper));
+		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(mapper, getType());
+
+		return addFunction("map", mapper, getType(), outType, new MapInvokable<OUT, R>(mapper));
 	}
 
 	/**
@@ -413,13 +408,11 @@ public class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(flatMapper,
-				FlatMapFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(flatMapper,
-				FlatMapFunction.class, 1);
 
-		return addFunction("flatMap", flatMapper, inTypeWrapper, outTypeWrapper,
-				new FlatMapInvokable<OUT, R>(flatMapper));
+		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType());
+
+		return addFunction("flatMap", flatMapper, getType(), outType, new FlatMapInvokable<OUT, R>(
+				flatMapper));
 	}
 
 	/**
@@ -434,9 +427,9 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
+
+		return addFunction("reduce", reducer, getType(), getType(), new StreamReduceInvokable<OUT>(
+				reducer));
 	}
 
 	/**
@@ -454,11 +447,7 @@ public class DataStream<OUT> {
 	 * @return The filtered DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
-				FilterFunction.class, 0);
-
-		return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
-				filter));
+		return addFunction("filter", filter, getType(), getType(), new FilterInvokable<OUT>(filter));
 	}
 
 	/**
@@ -543,7 +532,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		checkFieldRange(positionToSum);
 		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
-				getClassAtPos(positionToSum), getOutputType()));
+				getClassAtPos(positionToSum), getType()));
 	}
 
 	/**
@@ -559,8 +548,7 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
-				getOutputType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
 	}
 
 	/**
@@ -573,7 +561,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		checkFieldRange(positionToMin);
-		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
 				AggregationType.MIN));
 	}
 
@@ -590,8 +578,8 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MIN, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+				false));
 	}
 
 	/**
@@ -604,7 +592,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		checkFieldRange(positionToMax);
-		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
 				AggregationType.MAX));
 	}
 
@@ -621,8 +609,8 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAX, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+				false));
 	}
 
 	/**
@@ -641,7 +629,7 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -661,7 +649,7 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -694,7 +682,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
 		checkFieldRange(positionToMinBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -727,7 +715,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
 		checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -737,11 +725,9 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<Long, ?> count() {
-		TypeWrapper<OUT> inTypeWrapper = outTypeWrapper;
-		TypeWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(Long.valueOf(0));
+		TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0));
 
-		return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
-				new CounterInvokable<OUT>());
+		return addFunction("counter", null, getType(), outTypeInfo, new CounterInvokable<OUT>());
 	}
 
 	/**
@@ -803,7 +789,7 @@ public class DataStream<OUT> {
 	public DataStreamSink<OUT> print() {
 		DataStream<OUT> inputStream = this.copy();
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
-		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, outTypeWrapper);
+		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
 
 		return returnStream;
 	}
@@ -923,7 +909,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
 			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), inputStream.outTypeWrapper);
+				path, format, millis, endTuple), inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -951,7 +937,7 @@ public class DataStream<OUT> {
 			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
 				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
-				inputStream.outTypeWrapper);
+				inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -1074,7 +1060,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
 			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), inputStream.outTypeWrapper);
+				path, format, millis, endTuple), inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -1102,7 +1088,7 @@ public class DataStream<OUT> {
 			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
 				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
-				inputStream.outTypeWrapper);
+				inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -1112,7 +1098,7 @@ public class DataStream<OUT> {
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
-				outTypeWrapper, outTypeWrapper, invokable);
+				typeInfo, typeInfo, invokable);
 
 		return returnStream;
 	}
@@ -1142,16 +1128,16 @@ public class DataStream<OUT> {
 	 * @return the data stream constructed
 	 */
 	protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
-			final Function function, TypeWrapper<OUT> inTypeWrapper, TypeWrapper<R> outTypeWrapper,
-			StreamInvokable<OUT, R> functionInvokable) {
+			final Function function, TypeInformation<OUT> inTypeInfo,
+			TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> functionInvokable) {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
-				functionName, outTypeWrapper);
+				functionName, outTypeInfo);
 
 		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeWrapper,
-					outTypeWrapper, functionName,
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeInfo,
+					outTypeInfo, functionName,
 					SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize user defined function");
@@ -1220,18 +1206,16 @@ public class DataStream<OUT> {
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
-		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT>(sinkFunction,
-				SinkFunction.class, 0));
+		return addSink(inputStream, sinkFunction, getType());
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction, TypeWrapper<OUT> inTypeWrapper) {
-		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
-				outTypeWrapper);
+			SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) {
+		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo);
 
 		try {
 			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
-					sinkFunction), inTypeWrapper, null, "sink", SerializationUtils
+					sinkFunction), inTypeInfo, null, "sink", SerializationUtils
 					.serialize(sinkFunction), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SinkFunction");
@@ -1252,7 +1236,7 @@ public class DataStream<OUT> {
 	@SuppressWarnings("rawtypes")
 	protected Class<?> getClassAtPos(int pos) {
 		Class<?> type;
-		TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
+		TypeInformation<OUT> outTypeInfo = getType();
 		if (outTypeInfo.isTupleType()) {
 			type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 6bf6f43..369c3eb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -1,44 +1,45 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
-
-/**
- * Represents the end of a DataStream.
- *
- * @param <IN>
- *            The type of the DataStream closed by the sink.
- */
-public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
-
-	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<IN> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
-	}
-
-	protected DataStreamSink(DataStream<IN> dataStream) {
-		super(dataStream);
-	}
-
-	@Override
-	protected DataStreamSink<IN> copy() {
-		throw new RuntimeException("Data stream sinks cannot be copied");
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Represents the end of a DataStream.
+ *
+ * @param <IN>
+ *            The type of the DataStream closed by the sink.
+ */
+public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
+
+	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType,
+			TypeInformation<IN> outTypeInfo) {
+		super(environment, operatorType, outTypeInfo);
+	}
+
+	protected DataStreamSink(DataStream<IN> dataStream) {
+		super(dataStream);
+	}
+
+	@Override
+	protected DataStreamSink<IN> copy() {
+		throw new RuntimeException("Data stream sinks cannot be copied");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 5b2747f..978ea42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
  * The DataStreamSource represents the starting point of a DataStream.
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
  */
 public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
 
-	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<OUT> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
+	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> outTypeInfo) {
+		super(environment, operatorType, outTypeInfo);
 	}
 
 	public DataStreamSource(DataStream<OUT> dataStream) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 6b07cca..32f664f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 /**
  * A GroupedDataStream represents a {@link DataStream} which has been
@@ -62,9 +61,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new GroupedReduceInvokable<OUT>(reducer, keySelector));
+		return addFunction("groupReduce", reducer, getType(), getType(),
+				new GroupedReduceInvokable<OUT>(reducer, keySelector));
 	}
 
 	/**
@@ -184,7 +182,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 				keySelector);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
-				outTypeWrapper, outTypeWrapper, invokable);
+				typeInfo, typeInfo, invokable);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 714807c..76da27c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,12 +23,12 @@ import java.util.Map.Entry;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
@@ -43,8 +43,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		DataStream<OUT> {
 
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
-			String operatorType, TypeWrapper<OUT> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
+			String operatorType, TypeInformation<OUT> outTypeInfo) {
+		super(environment, operatorType, outTypeInfo);
 		setBufferTimeout(environment.getBufferTimeout());
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index e1c091c..5a8f038 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -44,7 +44,7 @@ public class SplitDataStream<OUT> {
 	 * @return The output type.
 	 */
 	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
+		return dataStream.getType();
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
index ba6e75e..89c80ab 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
@@ -60,7 +60,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 */
 		public JoinPredicate<I1, I2> where(int... fields) {
 			return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(
-					op.input1.getOutputType(), fields));
+					op.input1.getType(), fields));
 		}
 
 		/**
@@ -76,7 +76,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 *         {@link JoinPredicate#equalTo} to continue the Join.
 		 */
 		public JoinPredicate<I1, I2> where(String... fields) {
-			return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getOutputType(),
+			return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getType(),
 					fields));
 		}
 
@@ -135,7 +135,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return The joined data stream.
 		 */
 		public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(int... fields) {
-			return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getOutputType(),
+			return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(),
 					fields));
 		}
 
@@ -154,7 +154,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return The joined data stream.
 		 */
 		public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(String... fields) {
-			return createJoinOperator(new PojoKeySelector<I2>(op.input2.getOutputType(), fields));
+			return createJoinOperator(new PojoKeySelector<I2>(op.input2.getType(), fields));
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 265e033..cc5f66e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple10;
@@ -43,21 +44,20 @@ import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.invokable.operator.ProjectInvokable;
-import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 public class StreamProjection<IN> {
 
 	private DataStream<IN> dataStream;
 	private int[] fieldIndexes;
-	private TypeWrapper<IN> inTypeWrapper;
+	private TypeInformation<IN> inTypeInfo;
 
 	protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
 		this.dataStream = dataStream;
 		this.fieldIndexes = fieldIndexes;
-		this.inTypeWrapper = dataStream.outTypeWrapper;
-		if (!inTypeWrapper.getTypeInfo().isTupleType()) {
+		this.inTypeInfo = dataStream.typeInfo;
+		if (!inTypeInfo.isTupleType()) {
 			throw new RuntimeException("Only Tuple DataStreams can be projected");
 		}
 	}
@@ -80,12 +80,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple1<T0>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple1<T0>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -109,11 +108,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple2<T0, T1>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple2<T0, T1>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outTypeWrapper));
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -139,12 +138,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple3<T0, T1, T2>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple3<T0, T1, T2>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -172,12 +170,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple4<T0, T1, T2, T3>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple4<T0, T1, T2, T3>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -206,13 +203,11 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-
-		TypeWrapper<Tuple5<T0, T1, T2, T3, T4>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple5<T0, T1, T2, T3, T4>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -245,12 +240,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple6<T0, T1, T2, T3, T4, T5>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes,
-						outTypeWrapper));
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -284,12 +278,14 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
-						outTypeWrapper));
 
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream
+				.addFunction("projection", null, inTypeInfo, outType,
+						new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
+								outType));
 	}
 
 	/**
@@ -325,12 +321,13 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
-						outTypeWrapper));
 
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
+						outType));
 	}
 
 	/**
@@ -369,11 +366,12 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
 				new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
-						outTypeWrapper));
+						outType));
 	}
 
 	/**
@@ -414,11 +412,12 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
 				new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
-						fieldIndexes, outTypeWrapper));
+						fieldIndexes, outType));
 	}
 
 	/**
@@ -462,12 +461,13 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
-						fieldIndexes, outTypeWrapper));
 
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
+						fieldIndexes, outType));
 	}
 
 	/**
@@ -513,17 +513,18 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
-				inTypeWrapper, fieldIndexes, types);
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -572,16 +573,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -632,17 +634,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -696,17 +698,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -762,17 +764,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -830,16 +832,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -899,16 +902,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -971,16 +975,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType = (TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1045,17 +1050,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType = (TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1123,16 +1128,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType = (TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1202,16 +1208,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType = (TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1284,16 +1291,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType = (TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1368,16 +1376,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType = (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1454,16 +1463,36 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
+	}
+
+	public static TypeInformation<?> extractFieldTypes(int[] fields, Class<?>[] givenTypes,
+			TypeInformation<?> inType) {
+
+		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
+		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+
+		for (int i = 0; i < fields.length; i++) {
+
+			if (inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
+				throw new IllegalArgumentException(
+						"Given types do not match types of input data set.");
+			}
+
+			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+		}
+
+		return new TupleTypeInfo<Tuple>(fieldTypes);
 	}
 
 }


Mime
View raw message