flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [04/12] git commit: [FLINK-1102] [streaming] Projection operator added to DataStream
Date Wed, 24 Sep 2014 19:51:37 GMT
[FLINK-1102] [streaming] Projection operator added to DataStream

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4175dca8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4175dca8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4175dca8

Branch: refs/heads/master
Commit: 4175dca89ea89522ff474b0c6c861516d03ee064
Parents: ad98337
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Sep 23 14:50:35 2014 +0200
Committer: mbalassi <balassi.marton@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |   32 +-
 .../flink/streaming/api/StreamConfig.java       |   14 +-
 .../api/datastream/ConnectedDataStream.java     |    6 +-
 .../streaming/api/datastream/DataStream.java    |   46 +-
 .../api/datastream/DataStreamSink.java          |    4 +-
 .../api/datastream/DataStreamSource.java        |    4 +-
 .../datastream/SingleOutputStreamOperator.java  |    4 +-
 .../api/datastream/StreamProjection.java        | 1469 ++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java |    8 +-
 .../api/invokable/StreamInvokable.java          |    8 +-
 .../operator/BatchReduceInvokable.java          |    2 +-
 .../invokable/operator/ProjectInvokable.java    |   65 +
 .../util/serialization/FunctionTypeWrapper.java |    2 +-
 .../util/serialization/ObjectTypeWrapper.java   |    2 +-
 .../util/serialization/ProjectTypeWrapper.java  |   70 +
 .../serialization/TypeSerializerWrapper.java    |   38 -
 .../util/serialization/TypeWrapper.java         |   38 +
 .../api/invokable/operator/ProjectTest.java     |   66 +
 .../api/streamvertex/StreamVertexTest.java      |    5 +
 .../flink/streaming/util/MockCollector.java     |    8 +-
 .../serialization/TypeSerializationTest.java    |    8 +-
 21 files changed, 1801 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index a04dbaa..3377ee0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,10 +64,10 @@ public class JobGraphBuilder {
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
 	private Map<String, StreamInvokable<?, ?>> invokableObjects;
-	private Map<String, TypeSerializerWrapper<?>> typeWrapperIn1;
-	private Map<String, TypeSerializerWrapper<?>> typeWrapperIn2;
-	private Map<String, TypeSerializerWrapper<?>> typeWrapperOut1;
-	private Map<String, TypeSerializerWrapper<?>> typeWrapperOut2;
+	private Map<String, TypeWrapper<?>> typeWrapperIn1;
+	private Map<String, TypeWrapper<?>> typeWrapperIn2;
+	private Map<String, TypeWrapper<?>> typeWrapperOut1;
+	private Map<String, TypeWrapper<?>> typeWrapperOut2;
 	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
@@ -103,10 +103,10 @@ public class JobGraphBuilder {
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
 		invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
-		typeWrapperIn1 = new HashMap<String, TypeSerializerWrapper<?>>();
-		typeWrapperIn2 = new HashMap<String, TypeSerializerWrapper<?>>();
-		typeWrapperOut1 = new HashMap<String, TypeSerializerWrapper<?>>();
-		typeWrapperOut2 = new HashMap<String, TypeSerializerWrapper<?>>();
+		typeWrapperIn1 = new HashMap<String, TypeWrapper<?>>();
+		typeWrapperIn2 = new HashMap<String, TypeWrapper<?>>();
+		typeWrapperOut1 = new HashMap<String, TypeWrapper<?>>();
+		typeWrapperOut2 = new HashMap<String, TypeWrapper<?>>();
 		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
 		vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
@@ -156,8 +156,8 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 */
 	public <IN, OUT> void addStreamVertex(String vertexName,
-			StreamInvokable<IN, OUT> invokableObject, TypeSerializerWrapper<?> inTypeWrapper,
-			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
+			StreamInvokable<IN, OUT> invokableObject, TypeWrapper<?> inTypeWrapper,
+			TypeWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
 		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
@@ -241,8 +241,8 @@ public class JobGraphBuilder {
 
 	public <IN1, IN2, OUT> void addCoTask(String vertexName,
 			CoInvokable<IN1, IN2, OUT> taskInvokableObject,
-			TypeSerializerWrapper<?> in1TypeWrapper, TypeSerializerWrapper<?> in2TypeWrapper,
-			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
+			TypeWrapper<?> in1TypeWrapper, TypeWrapper<?> in2TypeWrapper,
+			TypeWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
 		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
@@ -290,9 +290,9 @@ public class JobGraphBuilder {
 		iterationTailCount.put(vertexName, 0);
 	}
 
-	private void addTypeWrappers(String vertexName, TypeSerializerWrapper<?> in1,
-			TypeSerializerWrapper<?> in2, TypeSerializerWrapper<?> out1,
-			TypeSerializerWrapper<?> out2) {
+	private void addTypeWrappers(String vertexName, TypeWrapper<?> in1,
+			TypeWrapper<?> in2, TypeWrapper<?> out1,
+			TypeWrapper<?> out2) {
 		typeWrapperIn1.put(vertexName, in1);
 		typeWrapperIn2.put(vertexName, in2);
 		typeWrapperOut1.put(vertexName, out1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 6fac391..42c1adf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 public class StreamConfig {
 	private static final String INPUT_TYPE = "inputType_";
@@ -79,19 +79,19 @@ public class StreamConfig {
 	private static final String TYPE_WRAPPER_OUT_1 = "typeWrapper_out_1";
 	private static final String TYPE_WRAPPER_OUT_2 = "typeWrapper_out_2";
 
-	public void setTypeWrapperIn1(TypeSerializerWrapper<?> typeWrapper) {
+	public void setTypeWrapperIn1(TypeWrapper<?> typeWrapper) {
 		setTypeWrapper(TYPE_WRAPPER_IN_1, typeWrapper);
 	}
 
-	public void setTypeWrapperIn2(TypeSerializerWrapper<?> typeWrapper) {
+	public void setTypeWrapperIn2(TypeWrapper<?> typeWrapper) {
 		setTypeWrapper(TYPE_WRAPPER_IN_2, typeWrapper);
 	}
 
-	public void setTypeWrapperOut1(TypeSerializerWrapper<?> typeWrapper) {
+	public void setTypeWrapperOut1(TypeWrapper<?> typeWrapper) {
 		setTypeWrapper(TYPE_WRAPPER_OUT_1, typeWrapper);
 	}
 
-	public void setTypeWrapperOut2(TypeSerializerWrapper<?> typeWrapper) {
+	public void setTypeWrapperOut2(TypeWrapper<?> typeWrapper) {
 		setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
 	}
 
@@ -111,7 +111,7 @@ public class StreamConfig {
 		return getTypeInfo(TYPE_WRAPPER_OUT_2);
 	}
 
-	private void setTypeWrapper(String key, TypeSerializerWrapper<?> typeWrapper) {
+	private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
 		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
 	}
 
@@ -123,7 +123,7 @@ public class StreamConfig {
 			throw new RuntimeException("TypeSerializationWrapper must be set");
 		}
 
-		TypeSerializerWrapper<T> typeWrapper = (TypeSerializerWrapper<T>) SerializationUtils
+		TypeWrapper<T> typeWrapper = (TypeWrapper<T>) SerializationUtils
 				.deserialize(serializedWrapper);
 		if (typeWrapper != null) {
 			return typeWrapper.getTypeInfo();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 256f470..d491fad 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowGroupReduceI
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * The ConnectedDataStream represents a stream for two different data types. It
@@ -431,8 +431,8 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
-			final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
-			TypeSerializerWrapper<IN2> in2TypeWrapper, TypeSerializerWrapper<OUT> outTypeWrapper,
+			final Function function, TypeWrapper<IN1> in1TypeWrapper,
+			TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper,
 			CoInvokable<IN1, IN2, OUT> functionInvokable) {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 23bc80d..423de4b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -63,7 +64,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -88,7 +89,7 @@ public class DataStream<OUT> {
 	protected List<String> userDefinedNames;
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
-	protected final TypeSerializerWrapper<OUT> outTypeWrapper;
+	protected final TypeWrapper<OUT> outTypeWrapper;
 	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -105,7 +106,7 @@ public class DataStream<OUT> {
 	 *            Type of the output
 	 */
 	public DataStream(StreamExecutionEnvironment environment, String operatorType,
-			TypeSerializerWrapper<OUT> outTypeWrapper) {
+			TypeWrapper<OUT> outTypeWrapper) {
 		if (environment == null) {
 			throw new NullPointerException("context is null");
 		}
@@ -384,6 +385,29 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Initiates a Project transformation on a {@link Tuple} {@link DataStream}.<br/>
+	 * <b>Note: Only Tuple DataStreams can be projected.</b></br> The
+	 * transformation projects each Tuple of the DataSet onto a (sub)set of
+	 * fields.</br> This method returns a {@link StreamProjection} on which
+	 * {@link StreamProjection#types()} needs to be called to completed the
+	 * transformation.
+	 * 
+	 * @param fieldIndexes
+	 *            The field indexes of the input tuples that are retained. The
+	 *            order of fields in the output tuple corresponds to the order
+	 *            of field indexes.
+	 * @return A StreamProjection that needs to be converted into a DataStream
+	 *         to complete the project transformation by calling
+	 *         {@link StreamProjection#types()}.
+	 * 
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public StreamProjection<OUT> project(int... fieldIndexes) {
+		return new StreamProjection<OUT>(this.copy(), fieldIndexes);
+	}
+
+	/**
 	 * Groups the elements of a {@link DataStream} by the given key position to
 	 * be used with grouped operators like
 	 * {@link GroupedDataStream#reduce(ReduceFunction)}
@@ -565,8 +589,8 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<Long, ?> count() {
-		TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
-		TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
+		TypeWrapper<OUT> inTypeWrapper = outTypeWrapper;
+		TypeWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
 
 		return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
 				new CounterInvokable<OUT>());
@@ -968,8 +992,8 @@ public class DataStream<OUT> {
 	 * @return the data stream constructed
 	 */
 	protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
-			final Function function, TypeSerializerWrapper<OUT> inTypeWrapper,
-			TypeSerializerWrapper<R> outTypeWrapper, StreamInvokable<OUT, R> functionInvokable) {
+			final Function function, TypeWrapper<OUT> inTypeWrapper, TypeWrapper<R> outTypeWrapper,
+			StreamInvokable<OUT, R> functionInvokable) {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
@@ -1051,14 +1075,14 @@ public class DataStream<OUT> {
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> inTypeWrapper) {
+			SinkFunction<OUT> sinkFunction, TypeWrapper<OUT> inTypeWrapper) {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
 				outTypeWrapper);
 
 		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
-					inTypeWrapper, null, "sink", SerializationUtils.serialize(sinkFunction),
-					degreeOfParallelism);
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
+					sinkFunction), inTypeWrapper, null, "sink", SerializationUtils
+					.serialize(sinkFunction), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SinkFunction");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 4bcdd7b..6bf6f43 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * Represents the end of a DataStream.
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
  */
 public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
 
-	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<IN> outTypeWrapper) {
+	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<IN> outTypeWrapper) {
 		super(environment, operatorType, outTypeWrapper);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 5ddc69a..5b2747f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * The DataStreamSource represents the starting point of a DataStream.
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
  */
 public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
 
-	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
+	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<OUT> outTypeWrapper) {
 		super(environment, operatorType, outTypeWrapper);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 676e575..1674c6a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -21,7 +21,7 @@ import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
@@ -36,7 +36,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		DataStream<OUT> {
 
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
-			String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
+			String operatorType, TypeWrapper<OUT> outTypeWrapper) {
 		super(environment, operatorType, outTypeWrapper);
 		setBufferTimeout(environment.getBufferTimeout());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
new file mode 100644
index 0000000..265e033
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -0,0 +1,1469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.invokable.operator.ProjectInvokable;
+import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
+
+public class StreamProjection<IN> {
+
+	private DataStream<IN> dataStream;
+	private int[] fieldIndexes;
+	private TypeWrapper<IN> inTypeWrapper;
+
+	protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
+		this.dataStream = dataStream;
+		this.fieldIndexes = fieldIndexes;
+		this.inTypeWrapper = dataStream.outTypeWrapper;
+		if (!inTypeWrapper.getTypeInfo().isTupleType()) {
+			throw new RuntimeException("Only Tuple DataStreams can be projected");
+		}
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 * 
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @return The projected DataStream.
+	 * 
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0> SingleOutputStreamOperator<Tuple1<T0>, ?> types(Class<T0> type0) {
+		Class<?>[] types = { type0 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple1<T0>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple1<T0>>(
+				inTypeWrapper, fieldIndexes, types);
+
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 * 
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @return The projected DataStream.
+	 * 
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1> SingleOutputStreamOperator<Tuple2<T0, T1>, ?> types(Class<T0> type0,
+			Class<T1> type1) {
+		Class<?>[] types = { type0, type1 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple2<T0, T1>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple2<T0, T1>>(
+				inTypeWrapper, fieldIndexes, types);
+
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 * 
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @return The projected DataStream.
+	 * 
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2> SingleOutputStreamOperator<Tuple3<T0, T1, T2>, ?> types(Class<T0> type0,
+			Class<T1> type1, Class<T2> type2) {
+		Class<?>[] types = { type0, type1, type2 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple3<T0, T1, T2>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple3<T0, T1, T2>>(
+				inTypeWrapper, fieldIndexes, types);
+
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 * 
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @return The projected DataStream.
+	 * 
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3> SingleOutputStreamOperator<Tuple4<T0, T1, T2, T3>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3) {
+		Class<?>[] types = { type0, type1, type2, type3 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple4<T0, T1, T2, T3>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple4<T0, T1, T2, T3>>(
+				inTypeWrapper, fieldIndexes, types);
+
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 * 
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @return The projected DataStream.
+	 * 
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4> SingleOutputStreamOperator<Tuple5<T0, T1, T2, T3, T4>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4) {
+		Class<?>[] types = { type0, type1, type2, type3, type4 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple5<T0, T1, T2, T3, T4>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple5<T0, T1, T2, T3, T4>>(
+				inTypeWrapper, fieldIndexes, types);
+
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5> SingleOutputStreamOperator<Tuple6<T0, T1, T2, T3, T4, T5>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple6<T0, T1, T2, T3, T4, T5>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(
+				inTypeWrapper, fieldIndexes, types);
+
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes,
+						outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6> SingleOutputStreamOperator<Tuple7<T0, T1, T2, T3, T4, T5, T6>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+		TypeWrapper<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
+						outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7> SingleOutputStreamOperator<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+		TypeWrapper<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
+						outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8> SingleOutputStreamOperator<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
+						outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> SingleOutputStreamOperator<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
+						fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> SingleOutputStreamOperator<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+		TypeWrapper<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
+						fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> SingleOutputStreamOperator<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+		TypeWrapper<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
+								fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> SingleOutputStreamOperator<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> SingleOutputStreamOperator<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
+								fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> SingleOutputStreamOperator<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
+								fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> SingleOutputStreamOperator<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
+								fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> SingleOutputStreamOperator<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> SingleOutputStreamOperator<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> SingleOutputStreamOperator<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @param type19
+	 *            The class of field '19' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> SingleOutputStreamOperator<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18, Class<T19> type19) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18, type19 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
+								fieldIndexes, outTypeWrapper));
+
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @param type19
+	 *            The class of field '19' of the result Tuples.
+	 * @param type20
+	 *            The class of field '20' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> SingleOutputStreamOperator<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18, Class<T19> type19, Class<T20> type20) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18, type19,
+				type20 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @param type19
+	 *            The class of field '19' of the result Tuples.
+	 * @param type20
+	 *            The class of field '20' of the result Tuples.
+	 * @param type21
+	 *            The class of field '21' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> SingleOutputStreamOperator<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18, type19,
+				type20, type21 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @param type19
+	 *            The class of field '19' of the result Tuples.
+	 * @param type20
+	 *            The class of field '20' of the result Tuples.
+	 * @param type21
+	 *            The class of field '21' of the result Tuples.
+	 * @param type22
+	 *            The class of field '22' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> SingleOutputStreamOperator<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21,
+			Class<T22> type22) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18, type19,
+				type20, type21, type22 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @param type19
+	 *            The class of field '19' of the result Tuples.
+	 * @param type20
+	 *            The class of field '20' of the result Tuples.
+	 * @param type21
+	 *            The class of field '21' of the result Tuples.
+	 * @param type22
+	 *            The class of field '22' of the result Tuples.
+	 * @param type23
+	 *            The class of field '23' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> SingleOutputStreamOperator<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21,
+			Class<T22> type22, Class<T23> type23) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18, type19,
+				type20, type21, type22, type23 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
+	 * fields. Requires the classes of the fields of the resulting Tuples.
+	 *
+	 * @param type0
+	 *            The class of field '0' of the result Tuples.
+	 * @param type1
+	 *            The class of field '1' of the result Tuples.
+	 * @param type2
+	 *            The class of field '2' of the result Tuples.
+	 * @param type3
+	 *            The class of field '3' of the result Tuples.
+	 * @param type4
+	 *            The class of field '4' of the result Tuples.
+	 * @param type5
+	 *            The class of field '5' of the result Tuples.
+	 * @param type6
+	 *            The class of field '6' of the result Tuples.
+	 * @param type7
+	 *            The class of field '7' of the result Tuples.
+	 * @param type8
+	 *            The class of field '8' of the result Tuples.
+	 * @param type9
+	 *            The class of field '9' of the result Tuples.
+	 * @param type10
+	 *            The class of field '10' of the result Tuples.
+	 * @param type11
+	 *            The class of field '11' of the result Tuples.
+	 * @param type12
+	 *            The class of field '12' of the result Tuples.
+	 * @param type13
+	 *            The class of field '13' of the result Tuples.
+	 * @param type14
+	 *            The class of field '14' of the result Tuples.
+	 * @param type15
+	 *            The class of field '15' of the result Tuples.
+	 * @param type16
+	 *            The class of field '16' of the result Tuples.
+	 * @param type17
+	 *            The class of field '17' of the result Tuples.
+	 * @param type18
+	 *            The class of field '18' of the result Tuples.
+	 * @param type19
+	 *            The class of field '19' of the result Tuples.
+	 * @param type20
+	 *            The class of field '20' of the result Tuples.
+	 * @param type21
+	 *            The class of field '21' of the result Tuples.
+	 * @param type22
+	 *            The class of field '22' of the result Tuples.
+	 * @param type23
+	 *            The class of field '23' of the result Tuples.
+	 * @param type24
+	 *            The class of field '24' of the result Tuples.
+	 * @return The projected DataStream.
+	 *
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> SingleOutputStreamOperator<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>, ?> types(
+			Class<T0> type0, Class<T1> type1, Class<T2> type2, Class<T3> type3, Class<T4> type4,
+			Class<T5> type5, Class<T6> type6, Class<T7> type7, Class<T8> type8, Class<T9> type9,
+			Class<T10> type10, Class<T11> type11, Class<T12> type12, Class<T13> type13,
+			Class<T14> type14, Class<T15> type15, Class<T16> type16, Class<T17> type17,
+			Class<T18> type18, Class<T19> type19, Class<T20> type20, Class<T21> type21,
+			Class<T22> type22, Class<T23> type23, Class<T24> type24) {
+		Class<?>[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9,
+				type10, type11, type12, type13, type14, type15, type16, type17, type18, type19,
+				type20, type21, type22, type23, type24 };
+		if (types.length != this.fieldIndexes.length) {
+			throw new IllegalArgumentException(
+					"Numbers of projected fields and types do not match.");
+		}
+
+		TypeWrapper<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
+				inTypeWrapper, fieldIndexes, types);
+		return dataStream
+				.addFunction(
+						"projection",
+						null,
+						inTypeWrapper,
+						outTypeWrapper,
+						new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
+								fieldIndexes, outTypeWrapper));
+	}
+
+}


Mime
View raw message