Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A5AC0114E8 for ; Wed, 24 Sep 2014 19:52:21 +0000 (UTC) Received: (qmail 32168 invoked by uid 500); 24 Sep 2014 19:52:21 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 32144 invoked by uid 500); 24 Sep 2014 19:52:21 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 32135 invoked by uid 99); 24 Sep 2014 19:52:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:52:21 +0000 X-ASF-Spam-Status: No, hits=-2000.8 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 24 Sep 2014 19:51:38 +0000 Received: (qmail 31379 invoked by uid 99); 24 Sep 2014 19:51:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Sep 2014 19:51:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AC4599A3653; Wed, 24 Sep 2014 19:51:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 24 Sep 2014 19:51:37 -0000 Message-Id: In-Reply-To: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> References: <0830c32020a9488cabe4ea045e720c5b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/12] git commit: [FLINK-1102] [streaming] Projection operator added to DataStream X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-1102] [streaming] Projection operator added to DataStream Conflicts: flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java Conflicts: flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4175dca8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4175dca8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4175dca8 Branch: refs/heads/master Commit: 4175dca89ea89522ff474b0c6c861516d03ee064 Parents: ad98337 Author: Gyula Fora Authored: Tue Sep 23 14:50:35 2014 +0200 Committer: mbalassi Committed: Wed Sep 24 19:54:39 2014 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 32 +- .../flink/streaming/api/StreamConfig.java | 14 +- .../api/datastream/ConnectedDataStream.java | 6 +- .../streaming/api/datastream/DataStream.java | 46 +- .../api/datastream/DataStreamSink.java | 4 +- .../api/datastream/DataStreamSource.java | 4 +- .../datastream/SingleOutputStreamOperator.java | 4 +- .../api/datastream/StreamProjection.java | 1469 ++++++++++++++++++ .../environment/StreamExecutionEnvironment.java | 8 +- .../api/invokable/StreamInvokable.java | 8 +- .../operator/BatchReduceInvokable.java | 2 +- .../invokable/operator/ProjectInvokable.java | 65 + .../util/serialization/FunctionTypeWrapper.java | 2 +- .../util/serialization/ObjectTypeWrapper.java | 2 +- .../util/serialization/ProjectTypeWrapper.java | 70 + .../serialization/TypeSerializerWrapper.java | 38 - .../util/serialization/TypeWrapper.java | 38 + .../api/invokable/operator/ProjectTest.java | 66 + .../api/streamvertex/StreamVertexTest.java | 5 + .../flink/streaming/util/MockCollector.java | 8 +- .../serialization/TypeSerializationTest.java | 8 +- 21 files changed, 1801 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index a04dbaa..3377ee0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; import org.apache.flink.streaming.api.streamvertex.StreamVertex; import org.apache.flink.streaming.partitioner.ForwardPartitioner; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,10 +64,10 @@ public class JobGraphBuilder { private Map>> connectionTypes; private Map operatorNames; private Map> invokableObjects; - private Map> typeWrapperIn1; - private Map> typeWrapperIn2; - private Map> typeWrapperOut1; - private Map> typeWrapperOut2; + private Map> typeWrapperIn1; + private Map> typeWrapperIn2; + private Map> typeWrapperOut1; + private Map> typeWrapperOut2; private Map serializedFunctions; private Map outputSelectors; private Map> vertexClasses; @@ -103,10 +103,10 @@ public class JobGraphBuilder { connectionTypes = new HashMap>>(); operatorNames = new HashMap(); invokableObjects = new HashMap>(); - typeWrapperIn1 = new HashMap>(); - typeWrapperIn2 = new HashMap>(); - typeWrapperOut1 = new HashMap>(); - typeWrapperOut2 = new HashMap>(); + typeWrapperIn1 = new HashMap>(); + typeWrapperIn2 = new HashMap>(); + typeWrapperOut1 = new HashMap>(); + typeWrapperOut2 = new HashMap>(); serializedFunctions = new HashMap(); outputSelectors = new HashMap(); vertexClasses = new HashMap>(); @@ -156,8 +156,8 @@ public class JobGraphBuilder { * Number of parallel instances created */ public void addStreamVertex(String vertexName, - StreamInvokable invokableObject, TypeSerializerWrapper inTypeWrapper, - TypeSerializerWrapper outTypeWrapper, String operatorName, + StreamInvokable invokableObject, TypeWrapper inTypeWrapper, + TypeWrapper outTypeWrapper, String operatorName, byte[] serializedFunction, int parallelism) { addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, @@ -241,8 +241,8 @@ public class JobGraphBuilder { public void addCoTask(String vertexName, CoInvokable taskInvokableObject, - TypeSerializerWrapper in1TypeWrapper, TypeSerializerWrapper in2TypeWrapper, - TypeSerializerWrapper outTypeWrapper, String operatorName, + TypeWrapper in1TypeWrapper, TypeWrapper in2TypeWrapper, + TypeWrapper outTypeWrapper, String operatorName, byte[] serializedFunction, int parallelism) { addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, @@ -290,9 +290,9 @@ public class JobGraphBuilder { iterationTailCount.put(vertexName, 0); } - private void addTypeWrappers(String vertexName, TypeSerializerWrapper in1, - TypeSerializerWrapper in2, TypeSerializerWrapper out1, - TypeSerializerWrapper out2) { + private void addTypeWrappers(String vertexName, TypeWrapper in1, + TypeWrapper in2, TypeWrapper out1, + TypeWrapper out2) { typeWrapperIn1.put(vertexName, in1); typeWrapperIn2.put(vertexName, in2); typeWrapperOut1.put(vertexName, out1); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 6fac391..42c1adf 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; import org.apache.flink.streaming.partitioner.ShufflePartitioner; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; public class StreamConfig { private static final String INPUT_TYPE = "inputType_"; @@ -79,19 +79,19 @@ public class StreamConfig { private static final String TYPE_WRAPPER_OUT_1 = "typeWrapper_out_1"; private static final String TYPE_WRAPPER_OUT_2 = "typeWrapper_out_2"; - public void setTypeWrapperIn1(TypeSerializerWrapper typeWrapper) { + public void setTypeWrapperIn1(TypeWrapper typeWrapper) { setTypeWrapper(TYPE_WRAPPER_IN_1, typeWrapper); } - public void setTypeWrapperIn2(TypeSerializerWrapper typeWrapper) { + public void setTypeWrapperIn2(TypeWrapper typeWrapper) { setTypeWrapper(TYPE_WRAPPER_IN_2, typeWrapper); } - public void setTypeWrapperOut1(TypeSerializerWrapper typeWrapper) { + public void setTypeWrapperOut1(TypeWrapper typeWrapper) { setTypeWrapper(TYPE_WRAPPER_OUT_1, typeWrapper); } - public void setTypeWrapperOut2(TypeSerializerWrapper typeWrapper) { + public void setTypeWrapperOut2(TypeWrapper typeWrapper) { setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper); } @@ -111,7 +111,7 @@ public class StreamConfig { return getTypeInfo(TYPE_WRAPPER_OUT_2); } - private void setTypeWrapper(String key, TypeSerializerWrapper typeWrapper) { + private void setTypeWrapper(String key, TypeWrapper typeWrapper) { config.setBytes(key, SerializationUtils.serialize(typeWrapper)); } @@ -123,7 +123,7 @@ public class StreamConfig { throw new RuntimeException("TypeSerializationWrapper must be set"); } - TypeSerializerWrapper typeWrapper = (TypeSerializerWrapper) SerializationUtils + TypeWrapper typeWrapper = (TypeWrapper) SerializationUtils .deserialize(serializedWrapper); if (typeWrapper != null) { return typeWrapper.getTypeInfo(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 256f470..d491fad 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowGroupReduceI import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; /** * The ConnectedDataStream represents a stream for two different data types. It @@ -431,8 +431,8 @@ public class ConnectedDataStream { } protected SingleOutputStreamOperator addCoFunction(String functionName, - final Function function, TypeSerializerWrapper in1TypeWrapper, - TypeSerializerWrapper in2TypeWrapper, TypeSerializerWrapper outTypeWrapper, + final Function function, TypeWrapper in1TypeWrapper, + TypeWrapper in2TypeWrapper, TypeWrapper outTypeWrapper, CoInvokable functionInvokable) { @SuppressWarnings({ "unchecked", "rawtypes" }) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 23bc80d..423de4b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -63,7 +64,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner; import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; /** * A DataStream represents a stream of elements of the same type. A DataStream @@ -88,7 +89,7 @@ public class DataStream { protected List userDefinedNames; protected boolean selectAll; protected StreamPartitioner partitioner; - protected final TypeSerializerWrapper outTypeWrapper; + protected final TypeWrapper outTypeWrapper; protected List> mergedStreams; protected final JobGraphBuilder jobGraphBuilder; @@ -105,7 +106,7 @@ public class DataStream { * Type of the output */ public DataStream(StreamExecutionEnvironment environment, String operatorType, - TypeSerializerWrapper outTypeWrapper) { + TypeWrapper outTypeWrapper) { if (environment == null) { throw new NullPointerException("context is null"); } @@ -384,6 +385,29 @@ public class DataStream { } /** + * Initiates a Project transformation on a {@link Tuple} {@link DataStream}.
+ * Note: Only Tuple DataStreams can be projected.
The + * transformation projects each Tuple of the DataSet onto a (sub)set of + * fields.
This method returns a {@link StreamProjection} on which + * {@link StreamProjection#types()} needs to be called to completed the + * transformation. + * + * @param fieldIndexes + * The field indexes of the input tuples that are retained. The + * order of fields in the output tuple corresponds to the order + * of field indexes. + * @return A StreamProjection that needs to be converted into a DataStream + * to complete the project transformation by calling + * {@link StreamProjection#types()}. + * + * @see Tuple + * @see DataStream + */ + public StreamProjection project(int... fieldIndexes) { + return new StreamProjection(this.copy(), fieldIndexes); + } + + /** * Groups the elements of a {@link DataStream} by the given key position to * be used with grouped operators like * {@link GroupedDataStream#reduce(ReduceFunction)} @@ -565,8 +589,8 @@ public class DataStream { * @return The transformed DataStream. */ public SingleOutputStreamOperator count() { - TypeSerializerWrapper inTypeWrapper = outTypeWrapper; - TypeSerializerWrapper outTypeWrapper = new ObjectTypeWrapper(new Long(0)); + TypeWrapper inTypeWrapper = outTypeWrapper; + TypeWrapper outTypeWrapper = new ObjectTypeWrapper(new Long(0)); return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable()); @@ -968,8 +992,8 @@ public class DataStream { * @return the data stream constructed */ protected SingleOutputStreamOperator addFunction(String functionName, - final Function function, TypeSerializerWrapper inTypeWrapper, - TypeSerializerWrapper outTypeWrapper, StreamInvokable functionInvokable) { + final Function function, TypeWrapper inTypeWrapper, TypeWrapper outTypeWrapper, + StreamInvokable functionInvokable) { DataStream inputStream = this.copy(); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, @@ -1051,14 +1075,14 @@ public class DataStream { } private DataStreamSink addSink(DataStream inputStream, - SinkFunction sinkFunction, TypeSerializerWrapper inTypeWrapper) { + SinkFunction sinkFunction, TypeWrapper inTypeWrapper) { DataStreamSink returnStream = new DataStreamSink(environment, "sink", outTypeWrapper); try { - jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable(sinkFunction), - inTypeWrapper, null, "sink", SerializationUtils.serialize(sinkFunction), - degreeOfParallelism); + jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable( + sinkFunction), inTypeWrapper, null, "sink", SerializationUtils + .serialize(sinkFunction), degreeOfParallelism); } catch (SerializationException e) { throw new RuntimeException("Cannot serialize SinkFunction"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 4bcdd7b..6bf6f43 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; /** * Represents the end of a DataStream. @@ -28,7 +28,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; */ public class DataStreamSink extends SingleOutputStreamOperator> { - protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper outTypeWrapper) { + protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeWrapper outTypeWrapper) { super(environment, operatorType, outTypeWrapper); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index 5ddc69a..5b2747f 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; /** * The DataStreamSource represents the starting point of a DataStream. @@ -28,7 +28,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; */ public class DataStreamSource extends SingleOutputStreamOperator> { - public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper outTypeWrapper) { + public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeWrapper outTypeWrapper) { super(environment, operatorType, outTypeWrapper); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 676e575..1674c6a 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -21,7 +21,7 @@ import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; /** * The SingleOutputStreamOperator represents a user defined transformation @@ -36,7 +36,7 @@ public class SingleOutputStreamOperator { protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, - String operatorType, TypeSerializerWrapper outTypeWrapper) { + String operatorType, TypeWrapper outTypeWrapper) { super(environment, operatorType, outTypeWrapper); setBufferTimeout(environment.getBufferTimeout()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java new file mode 100644 index 0000000..265e033 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -0,0 +1,1469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.streaming.api.invokable.operator.ProjectInvokable; +import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper; +import org.apache.flink.streaming.util.serialization.TypeWrapper; + +public class StreamProjection { + + private DataStream dataStream; + private int[] fieldIndexes; + private TypeWrapper inTypeWrapper; + + protected StreamProjection(DataStream dataStream, int[] fieldIndexes) { + this.dataStream = dataStream; + this.fieldIndexes = fieldIndexes; + this.inTypeWrapper = dataStream.outTypeWrapper; + if (!inTypeWrapper.getTypeInfo().isTupleType()) { + throw new RuntimeException("Only Tuple DataStreams can be projected"); + } + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types(Class type0) { + Class[] types = { type0 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types(Class type0, + Class type1) { + Class[] types = { type0, type1 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types(Class type0, + Class type1, Class type2) { + Class[] types = { type0, type1, type2 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3) { + Class[] types = { type0, type1, type2, type3 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4) { + Class[] types = { type0, type1, type2, type3, type4 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5) { + Class[] types = { type0, type1, type2, type3, type4, type5 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, + outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, + outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, + outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>(fieldIndexes, + outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @param type19 + * The class of field '19' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18, Class type19) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18, type19 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @param type19 + * The class of field '19' of the result Tuples. + * @param type20 + * The class of field '20' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18, Class type19, Class type20) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, + type20 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @param type19 + * The class of field '19' of the result Tuples. + * @param type20 + * The class of field '20' of the result Tuples. + * @param type21 + * The class of field '21' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18, Class type19, Class type20, Class type21) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, + type20, type21 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @param type19 + * The class of field '19' of the result Tuples. + * @param type20 + * The class of field '20' of the result Tuples. + * @param type21 + * The class of field '21' of the result Tuples. + * @param type22 + * The class of field '22' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18, Class type19, Class type20, Class type21, + Class type22) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, + type20, type21, type22 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @param type19 + * The class of field '19' of the result Tuples. + * @param type20 + * The class of field '20' of the result Tuples. + * @param type21 + * The class of field '21' of the result Tuples. + * @param type22 + * The class of field '22' of the result Tuples. + * @param type23 + * The class of field '23' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18, Class type19, Class type20, Class type21, + Class type22, Class type23) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, + type20, type21, type22, type23 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + + /** + * Projects a {@link Tuple} {@link DataStream} to the previously selected + * fields. Requires the classes of the fields of the resulting Tuples. + * + * @param type0 + * The class of field '0' of the result Tuples. + * @param type1 + * The class of field '1' of the result Tuples. + * @param type2 + * The class of field '2' of the result Tuples. + * @param type3 + * The class of field '3' of the result Tuples. + * @param type4 + * The class of field '4' of the result Tuples. + * @param type5 + * The class of field '5' of the result Tuples. + * @param type6 + * The class of field '6' of the result Tuples. + * @param type7 + * The class of field '7' of the result Tuples. + * @param type8 + * The class of field '8' of the result Tuples. + * @param type9 + * The class of field '9' of the result Tuples. + * @param type10 + * The class of field '10' of the result Tuples. + * @param type11 + * The class of field '11' of the result Tuples. + * @param type12 + * The class of field '12' of the result Tuples. + * @param type13 + * The class of field '13' of the result Tuples. + * @param type14 + * The class of field '14' of the result Tuples. + * @param type15 + * The class of field '15' of the result Tuples. + * @param type16 + * The class of field '16' of the result Tuples. + * @param type17 + * The class of field '17' of the result Tuples. + * @param type18 + * The class of field '18' of the result Tuples. + * @param type19 + * The class of field '19' of the result Tuples. + * @param type20 + * The class of field '20' of the result Tuples. + * @param type21 + * The class of field '21' of the result Tuples. + * @param type22 + * The class of field '22' of the result Tuples. + * @param type23 + * The class of field '23' of the result Tuples. + * @param type24 + * The class of field '24' of the result Tuples. + * @return The projected DataStream. + * + * @see Tuple + * @see DataStream + */ + public SingleOutputStreamOperator, ?> types( + Class type0, Class type1, Class type2, Class type3, Class type4, + Class type5, Class type6, Class type7, Class type8, Class type9, + Class type10, Class type11, Class type12, Class type13, + Class type14, Class type15, Class type16, Class type17, + Class type18, Class type19, Class type20, Class type21, + Class type22, Class type23, Class type24) { + Class[] types = { type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, + type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, + type20, type21, type22, type23, type24 }; + if (types.length != this.fieldIndexes.length) { + throw new IllegalArgumentException( + "Numbers of projected fields and types do not match."); + } + + TypeWrapper> outTypeWrapper = new ProjectTypeWrapper>( + inTypeWrapper, fieldIndexes, types); + return dataStream + .addFunction( + "projection", + null, + inTypeWrapper, + outTypeWrapper, + new ProjectInvokable>( + fieldIndexes, outTypeWrapper)); + } + +}