Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CA301116E5 for ; Mon, 18 Aug 2014 17:27:41 +0000 (UTC) Received: (qmail 99617 invoked by uid 500); 18 Aug 2014 17:27:41 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 99594 invoked by uid 500); 18 Aug 2014 17:27:41 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 99554 invoked by uid 99); 18 Aug 2014 17:27:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Aug 2014 17:27:41 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 18 Aug 2014 17:25:48 +0000 Received: (qmail 93943 invoked by uid 99); 18 Aug 2014 17:25:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Aug 2014 17:25:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0FCB69B599A; Mon, 18 Aug 2014 17:25:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Mon, 18 Aug 2014 17:26:16 -0000 Message-Id: <75ac36fcfd3141df8f25fdcd7297e029@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/51] [abbrv] [streaming] API update with more differentiated DataStream types and javadoc + several fixes X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java deleted file mode 100644 index 76adf62..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java +++ /dev/null @@ -1,410 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.streaming.api; - -import java.io.Serializable; -import java.util.Collection; - -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.streaming.api.function.source.FileSourceFunction; -import org.apache.flink.streaming.api.function.source.FileStreamFunction; -import org.apache.flink.streaming.api.function.source.FromElementsFunction; -import org.apache.flink.streaming.api.function.source.GenSequenceFunction; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.invokable.SourceInvokable; -import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; -import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper; - -/** - * {@link ExecutionEnvironment} for streaming jobs. An instance of it is - * necessary to construct streaming topologies. - * - */ -public abstract class StreamExecutionEnvironment { - - /** - * The environment of the context (local by default, cluster if invoked - * through command line) - */ - private static StreamExecutionEnvironment contextEnvironment; - - /** flag to disable local executor when using the ContextEnvironment */ - private static boolean allowLocalExecution = true; - - private static int defaultLocalDop = Runtime.getRuntime().availableProcessors(); - - private int degreeOfParallelism = 1; - - private int executionParallelism = -1; - - protected JobGraphBuilder jobGraphBuilder; - - // -------------------------------------------------------------------------------------------- - // Constructor and Properties - // -------------------------------------------------------------------------------------------- - - /** - * Constructor for creating StreamExecutionEnvironment - */ - protected StreamExecutionEnvironment() { - jobGraphBuilder = new JobGraphBuilder("jobGraph"); - } - - public int getExecutionParallelism() { - return executionParallelism == -1 ? degreeOfParallelism : executionParallelism; - } - - /** - * Gets the degree of parallelism with which operation are executed by - * default. Operations can individually override this value to use a - * specific degree of parallelism via {@link DataStream#setParallelism}. - * - * @return The degree of parallelism used by operations, unless they - * override that value. - */ - public int getDegreeOfParallelism() { - return this.degreeOfParallelism; - } - - /** - * Sets the degree of parallelism (DOP) for operations executed through this - * environment. Setting a DOP of x here will cause all operators (such as - * map, batchReduce) to run with x parallel instances. This method overrides - * the default parallelism for this environment. The - * {@link LocalStreamEnvironment} uses by default a value equal to the - * number of hardware contexts (CPU cores / threads). When executing the - * program via the command line client from a JAR file, the default degree - * of parallelism is the one configured for that setup. - * - * @param degreeOfParallelism - * The degree of parallelism - */ - protected void setDegreeOfParallelism(int degreeOfParallelism) { - if (degreeOfParallelism < 1) { - throw new IllegalArgumentException("Degree of parallelism must be at least one."); - } - this.degreeOfParallelism = degreeOfParallelism; - } - - /** - * Sets the number of hardware contexts (CPU cores / threads) used when - * executed in {@link LocalStreamEnvironment}. - * - * @param degreeOfParallelism - * The degree of parallelism in local environment - */ - public void setExecutionParallelism(int degreeOfParallelism) { - if (degreeOfParallelism < 1) { - throw new IllegalArgumentException("Degree of parallelism must be at least one."); - } - - this.executionParallelism = degreeOfParallelism; - } - - // -------------------------------------------------------------------------------------------- - // Data stream creations - // -------------------------------------------------------------------------------------------- - - /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise. The file will be read with the system's default - * character set. - * - * @param filePath - * The path of the file, as a URI (e.g., - * "file:///some/local/file" or "hdfs://host:port/file/path"). - * @return The DataStream representing the text file. - */ - public DataStream readTextFile(String filePath) { - return addSource(new FileSourceFunction(filePath), 1); - } - - public DataStream readTextFile(String filePath, int parallelism) { - return addSource(new FileSourceFunction(filePath), parallelism); - } - - /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise multiple times(infinite). The file will be read with - * the system's default character set. - * - * @param filePath - * The path of the file, as a URI (e.g., - * "file:///some/local/file" or "hdfs://host:port/file/path"). - * @return The DataStream representing the text file. - */ - public DataStream readTextStream(String filePath) { - return addSource(new FileStreamFunction(filePath), 1); - } - - public DataStream readTextStream(String filePath, int parallelism) { - return addSource(new FileStreamFunction(filePath), parallelism); - } - - /** - * Creates a new DataStream that contains the given elements. The elements - * must all be of the same type, for example, all of the String or Integer. - * The sequence of elements must not be empty. Furthermore, the elements - * must be serializable (as defined in java.io.Serializable), because the - * execution environment may ship the elements into the cluster. - * - * @param data - * The collection of elements to create the DataStream from. - * @param - * type of the returned stream - * @return The DataStream representing the elements. - */ - public DataStream fromElements(OUT... data) { - DataStream returnStream = new DataStream(this, "elements"); - - try { - SourceFunction function = new FromElementsFunction(data); - jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable(function), - new ObjectTypeWrapper(data[0], null, data[0]), "source", - SerializationUtils.serialize(function), 1); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize elements"); - } - return returnStream; - } - - /** - * Creates a DataStream from the given non-empty collection. The type of the - * DataStream is that of the elements in the collection. The elements need - * to be serializable (as defined by java.io.Serializable), because the - * framework may move the elements into the cluster if needed. - * - * @param data - * The collection of elements to create the DataStream from. - * @param - * type of the returned stream - * @return The DataStream representing the elements. - */ - @SuppressWarnings("unchecked") - public DataStream fromCollection(Collection data) { - DataStream returnStream = new DataStream(this, "elements"); - - if (data.isEmpty()) { - throw new RuntimeException("Collection must not be empty"); - } - - try { - SourceFunction function = new FromElementsFunction(data); - - jobGraphBuilder.addSource( - returnStream.getId(), - new SourceInvokable(new FromElementsFunction(data)), - new ObjectTypeWrapper((OUT) data.toArray()[0], null, (OUT) data - .toArray()[0]), "source", SerializationUtils.serialize(function), 1); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize collection"); - } - - return returnStream; - } - - /** - * Creates a new DataStream that contains a sequence of numbers. - * - * @param from - * The number to start at (inclusive). - * @param to - * The number to stop at (inclusive) - * @return A DataStrean, containing all number in the [from, to] interval. - */ - public DataStream generateSequence(long from, long to) { - return addSource(new GenSequenceFunction(from, to), 1); - } - - /** - * Ads a data source thus opening a {@link DataStream}. - * - * @param function - * the user defined function - * @param parallelism - * number of parallel instances of the function - * @param - * type of the returned stream - * @return the data stream constructed - */ - public DataStream addSource(SourceFunction function, int parallelism) { - DataStream returnStream = new DataStream(this, "source"); - - try { - jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable(function), - new FunctionTypeWrapper(function, SourceFunction.class, 0, -1, 0), - "source", SerializationUtils.serialize(function), parallelism); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize SourceFunction"); - } - - return returnStream; - } - - public DataStream addSource(SourceFunction sourceFunction) { - return addSource(sourceFunction, 1); - } - - // -------------------------------------------------------------------------------------------- - // Instantiation of Execution Contexts - // -------------------------------------------------------------------------------------------- - - /** - * Creates an execution environment that represents the context in which the - * program is currently executed. If the program is invoked standalone, this - * method returns a local execution environment, as returned by - * {@link #createLocalEnvironment()}. - * - * @return The execution environment of the context in which the program is - * executed. - */ - public static StreamExecutionEnvironment getExecutionEnvironment() { - return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment; - } - - /** - * Creates a {@link LocalStreamEnvironment}. The local execution environment - * will run the program in a multi-threaded fashion in the same JVM as the - * environment was created in. The default degree of parallelism of the - * local environment is the number of hardware contexts (CPU cores / - * threads), unless it was specified differently by - * {@link #setDegreeOfParallelism(int)}. - * - * @return A local execution environment. - */ - public static LocalStreamEnvironment createLocalEnvironment() { - return createLocalEnvironment(defaultLocalDop); - } - - /** - * Creates a {@link LocalStreamEnvironment}. The local execution environment - * will run the program in a multi-threaded fashion in the same JVM as the - * environment was created in. It will use the degree of parallelism - * specified in the parameter. - * - * @param degreeOfParallelism - * The degree of parallelism for the local environment. - * @return A local execution environment with the specified degree of - * parallelism. - */ - public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) { - LocalStreamEnvironment lee = new LocalStreamEnvironment(); - lee.setDegreeOfParallelism(degreeOfParallelism); - return lee; - } - - // TODO:fix cluster default parallelism - /** - * Creates a {@link RemoteStreamEnvironment}. The remote environment sends - * (parts of) the program to a cluster for execution. Note that all file - * paths used in the program must be accessible from the cluster. The - * execution will use no parallelism, unless the parallelism is set - * explicitly via {@link #setDegreeOfParallelism}. - * - * @param host - * The host name or address of the master (JobManager), where the - * program should be executed. - * @param port - * The port of the master (JobManager), where the program should - * be executed. - * @param jarFiles - * The JAR files with code that needs to be shipped to the - * cluster. If the program uses user-defined functions, - * user-defined input formats, or any libraries, those must be - * provided in the JAR files. - * @return A remote environment that executes the program on a cluster. - */ - public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, - String... jarFiles) { - return new RemoteStreamEnvironment(host, port, jarFiles); - } - - /** - * Creates a {@link RemoteStreamEnvironment}. The remote environment sends - * (parts of) the program to a cluster for execution. Note that all file - * paths used in the program must be accessible from the cluster. The - * execution will use the specified degree of parallelism. - * - * @param host - * The host name or address of the master (JobManager), where the - * program should be executed. - * @param port - * The port of the master (JobManager), where the program should - * be executed. - * @param degreeOfParallelism - * The degree of parallelism to use during the execution. - * @param jarFiles - * The JAR files with code that needs to be shipped to the - * cluster. If the program uses user-defined functions, - * user-defined input formats, or any libraries, those must be - * provided in the JAR files. - * @return A remote environment that executes the program on a cluster. - */ - public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, - int degreeOfParallelism, String... jarFiles) { - RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles); - rec.setDegreeOfParallelism(degreeOfParallelism); - return rec; - } - - // -------------------------------------------------------------------------------------------- - // Methods to control the context and local environments for execution from - // packaged programs - // -------------------------------------------------------------------------------------------- - - protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) { - contextEnvironment = ctx; - } - - protected static boolean isContextEnvironmentSet() { - return contextEnvironment != null; - } - - protected static void disableLocalExecution() { - allowLocalExecution = false; - } - - public static boolean localExecutionIsAllowed() { - return allowLocalExecution; - } - - /** - * Triggers the program execution. The environment will execute all parts of - * the program that have resulted in a "sink" operation. Sink operations are - * for example printing results or forwarding them to a message queue. - *

- * The program execution will be logged and displayed with a generated - * default name. - **/ - public abstract void execute(); - - /** - * Getter of the {@link JobGraphBuilder} of the streaming job. - * - * @return jobgraph - */ - public JobGraphBuilder getJobGraphBuilder() { - return jobGraphBuilder; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java deleted file mode 100755 index a39823c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.streaming.api; - -/** - * The StreamOperator represents a {@link DataStream} transformed with some user - * defined operator. - * - * @param - * Output Type of the operator. - */ -public class StreamOperator extends DataStream { - - protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) { - super(environment, operatorType); - } - - protected StreamOperator(DataStream dataStream) { - super(dataStream); - } - - @Override - protected DataStream copy() { - return new StreamOperator(this); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java deleted file mode 100755 index a459dbf..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.streaming.api; - -/** - * The TwoInputStreamOperator represents a {@link StreamOperator} with two - * inputs. - * - * @param - * Type of the first input. - * - * @param - * Type of the second input. - * @param - * Output Type of the operator. - */ -public class TwoInputStreamOperator extends StreamOperator { - - protected TwoInputStreamOperator(StreamExecutionEnvironment environment, String operatorType) { - super(environment, operatorType); - } - - protected TwoInputStreamOperator(DataStream dataStream) { - super(dataStream); - } - - @Override - protected DataStream copy() { - return new TwoInputStreamOperator(this); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java index 73a5749..ced3de7 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.collector; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +41,7 @@ public class DirectedStreamCollector extends StreamCollector { OutputSelector outputSelector; private static final Log log = LogFactory.getLog(DirectedStreamCollector.class); + private List>>> emitted; /** * Creates a new DirectedStreamCollector @@ -55,6 +58,7 @@ public class DirectedStreamCollector extends StreamCollector { OutputSelector outputSelector) { super(channelID, serializationDelegate); this.outputSelector = outputSelector; + this.emitted = new ArrayList>>>(); } @@ -82,11 +86,14 @@ public class DirectedStreamCollector extends StreamCollector { Collection outputNames = outputSelector.getOutputs(streamRecord.getObject()); streamRecord.setId(channelID); serializationDelegate.setInstance(streamRecord); + emitted.clear(); for (String outputName : outputNames) { try { - for (RecordWriter>> output : outputMap - .get(outputName)) { + RecordWriter>> output = outputMap + .get(outputName); + if (!emitted.contains(output)) { output.emit(serializationDelegate); + emitted.add(output); } } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java index 798d8fa..17d7e7b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java @@ -23,13 +23,16 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitDataStream; + /** - * Class for defining an OutputSelector for the directTo operator. Every output - * object of a directed DataStream will run through this operator to select - * outputs. + * Class for defining an OutputSelector for a {@link SplitDataStream} using the + * {@link SingleOutputStreamOperator#split} call. Every output object of a + * {@link SplitDataStream} will run through this operator to select outputs. * * @param - * Type parameter of the directed tuples/objects. + * Type parameter of the split values. */ public abstract class OutputSelector implements Serializable { private static final long serialVersionUID = 1L; @@ -48,8 +51,9 @@ public abstract class OutputSelector implements Serializable { /** * Method for selecting output names for the emitted objects when using the - * directTo operator. The tuple will be emitted only to output names which - * are added to the outputs collection. + * {@link SingleOutputStreamOperator#split} method. The values will be + * emitted only to output names which are added to the outputs collection. + * The outputs collection is cleared automatically after each select call. * * @param value * Output object for which the output selection should be made. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java index 4317f75..20c3b78 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java @@ -34,7 +34,7 @@ import org.apache.flink.util.StringUtils; /** * Collector for tuples in Apache Flink stream processing. The collected - * tuples/obecjts will be wrapped with ID in a {@link StreamRecord} and then + * values will be wrapped with ID in a {@link StreamRecord} and then * emitted to the outputs. * * @param @@ -47,7 +47,7 @@ public class StreamCollector implements Collector { protected StreamRecord streamRecord; protected int channelID; private List>>> outputs; - protected Map>>>> outputMap; + protected Map>>> outputMap; protected SerializationDelegate> serializationDelegate; /** @@ -65,7 +65,7 @@ public class StreamCollector implements Collector { this.streamRecord = new StreamRecord(); this.channelID = channelID; this.outputs = new ArrayList>>>(); - this.outputMap = new HashMap>>>>(); + this.outputMap = new HashMap>>>(); } /** @@ -73,21 +73,19 @@ public class StreamCollector implements Collector { * * @param output * The RecordWriter object representing the output. - * @param outputName - * User defined name of the output. + * @param outputNames + * User defined names of the output. */ public void addOutput(RecordWriter>> output, - String outputName) { + List outputNames) { outputs.add(output); - if (outputName != null) { - if (outputMap.containsKey(outputName)) { - outputMap.get(outputName).add(output); - } else { - outputMap.put(outputName, - new ArrayList>>>()); - outputMap.get(outputName).add(output); - } + for (String outputName : outputNames) { + if (outputName != null) { + if (!outputMap.containsKey(outputName)) { + outputMap.put(outputName, output); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java new file mode 100755 index 0000000..c6cb8af --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java @@ -0,0 +1,132 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import java.io.Serializable; + +import org.apache.commons.lang3.SerializationException; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.JobGraphBuilder; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.co.CoMapFunction; +import org.apache.flink.streaming.api.function.co.RichCoMapFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; +import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; +import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; +import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; + +/** + * The CoDataStream represents a stream for two different data types. It can be + * used to apply transformations like {@link CoMapFunction} on two + * {@link DataStream}s + * + * @param + * Type of the first DataSteam. + * @param + * Type of the second DataStream. + */ +public class CoDataStream { + + StreamExecutionEnvironment environment; + JobGraphBuilder jobGraphBuilder; + DataStream input1; + DataStream input2; + + protected CoDataStream(StreamExecutionEnvironment environment, JobGraphBuilder jobGraphBuilder, + DataStream input1, DataStream input2) { + this.jobGraphBuilder = jobGraphBuilder; + this.environment = environment; + this.input1 = input1.copy(); + this.input2 = input2.copy(); + } + + /** + * Returns the first {@link DataStream}. + * + * @return The first DataStream. + */ + public DataStream getFirst() { + return input1.copy(); + } + + /** + * Returns the second {@link DataStream}. + * + * @return The second DataStream. + */ + public DataStream getSecond() { + return input2.copy(); + } + + /** + * Applies a CoMap transformation on two separate {@link DataStream}s. The + * transformation calls a {@link CoMapFunction#map1} for each element + * of the first input and {@link CoMapFunction#map2} for each element + * of the second input. Each CoMapFunction call returns exactly one element. + * The user can also extend {@link RichCoMapFunction} to gain access to + * other features provided by the {@link RichFuntion} interface. + * + * @param coMapper + * The CoMapFunction used to jointly transform the two input + * DataStreams + * @return The transformed DataStream + */ + public SingleOutputStreamOperator map(CoMapFunction coMapper) { + return addCoFunction("coMap", coMapper, new FunctionTypeWrapper(coMapper, + CoMapFunction.class, 0, 1, 2), new CoMapInvokable(coMapper)); + } + + protected SingleOutputStreamOperator addCoFunction(String functionName, + final Function function, TypeSerializerWrapper typeWrapper, + CoInvokable functionInvokable) { + + @SuppressWarnings({ "unchecked", "rawtypes" }) + SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator( + environment, functionName); + + try { + input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper, + functionName, SerializationUtils.serialize((Serializable) function), + environment.getDegreeOfParallelism()); + } catch (SerializationException e) { + throw new RuntimeException("Cannot serialize user defined function"); + } + + input1.connectGraph(input1, returnStream.getId(), 1); + input1.connectGraph(input2, returnStream.getId(), 2); + + if ((input1.userDefinedName != null) && (input2.userDefinedName != null)) { + throw new RuntimeException("An operator cannot have two names"); + } else { + if (input1.userDefinedName != null) { + returnStream.name(input1.getUserDefinedNames()); + } + + if (input2.userDefinedName != null) { + returnStream.name(input2.getUserDefinedNames()); + } + } + // TODO consider iteration + + return returnStream; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java new file mode 100755 index 0000000..d17990c --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -0,0 +1,98 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.partitioner.StreamPartitioner; + +/** + * The ConnectedDataStream represents a DataStream which consists of connected + * outputs of DataStreams of the same type. Operators applied on this will + * transform all the connected outputs jointly. + * + * @param + * Type of the output. + */ +public class ConnectedDataStream extends DataStream { + + protected List> connectedStreams; + + protected ConnectedDataStream(StreamExecutionEnvironment environment, String operatorType) { + super(environment, operatorType); + this.connectedStreams = new ArrayList>(); + this.connectedStreams.add(this); + } + + protected ConnectedDataStream(DataStream dataStream) { + super(dataStream); + connectedStreams = new ArrayList>(); + if (dataStream instanceof ConnectedDataStream) { + for (DataStream stream : ((ConnectedDataStream) dataStream).connectedStreams) { + connectedStreams.add(stream); + } + } else { + this.connectedStreams.add(this); + } + + } + + // @Override + // public IterativeDataStream iterate() { + // throw new RuntimeException("Cannot iterate connected DataStreams"); + // } + + protected void addConnection(DataStream stream) { + if ((stream.userDefinedName != null) || (this.userDefinedName != null)) { + if (!this.userDefinedName.equals(stream.userDefinedName)) { + throw new RuntimeException("Error: Connected NamedDataStreams must have same names"); + } + } + connectedStreams.add(stream.copy()); + } + + @Override + protected List getUserDefinedNames() { + List nameList = new ArrayList(); + for (DataStream stream : connectedStreams) { + nameList.add(stream.userDefinedName); + } + return nameList; + } + + @Override + protected DataStream setConnectionType(StreamPartitioner partitioner) { + ConnectedDataStream returnStream = (ConnectedDataStream) this.copy(); + + for (DataStream stream : returnStream.connectedStreams) { + stream.partitioner = partitioner; + } + + return returnStream; + } + + @Override + protected ConnectedDataStream copy() { + return new ConnectedDataStream(this); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java new file mode 100644 index 0000000..b692984 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -0,0 +1,852 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.SerializationException; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.RichFilterFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.api.JobGraphBuilder; +import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv; +import org.apache.flink.streaming.api.function.sink.WriteFormatAsText; +import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches; +import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis; +import org.apache.flink.streaming.api.invokable.SinkInvokable; +import org.apache.flink.streaming.api.invokable.UserTaskInvokable; +import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.FilterInvokable; +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable; +import org.apache.flink.streaming.api.invokable.operator.MapInvokable; +import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable; +import org.apache.flink.streaming.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.partitioner.DistributePartitioner; +import org.apache.flink.streaming.partitioner.FieldsPartitioner; +import org.apache.flink.streaming.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.partitioner.StreamPartitioner; +import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; +import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; + +/** + * A DataStream represents a stream of elements of the same type. A DataStream + * can be transformed into another DataStream by applying a transformation as + * for example + *

    + *
  • {@link DataStream#map},
  • + *
  • {@link DataStream#filter}, or
  • + *
  • {@link DataStream#batchReduce}.
  • + *
+ * + * @param + * The type of the DataStream, i.e., the type of the elements of the + * DataStream. + */ +public abstract class DataStream { + + protected static Integer counter = 0; + protected final StreamExecutionEnvironment environment; + protected final String id; + protected int degreeOfParallelism; + protected String userDefinedName; + protected StreamPartitioner partitioner; + + protected final JobGraphBuilder jobGraphBuilder; + + /** + * Create a new {@link DataStream} in the given execution environment with + * partitioning set to forward by default. + * + * @param environment + * StreamExecutionEnvironment + * @param operatorType + * The type of the operator in the component + */ + public DataStream(StreamExecutionEnvironment environment, String operatorType) { + if (environment == null) { + throw new NullPointerException("context is null"); + } + + // TODO add name based on component number an preferable sequential id + counter++; + this.id = operatorType + "-" + counter.toString(); + this.environment = environment; + this.degreeOfParallelism = environment.getDegreeOfParallelism(); + this.jobGraphBuilder = environment.getJobGraphBuilder(); + this.partitioner = new ForwardPartitioner(); + + } + + /** + * Create a new DataStream by creating a copy of another DataStream + * + * @param dataStream + * The DataStream that will be copied. + */ + public DataStream(DataStream dataStream) { + this.environment = dataStream.environment; + this.id = dataStream.id; + this.degreeOfParallelism = dataStream.degreeOfParallelism; + this.userDefinedName = dataStream.userDefinedName; + this.partitioner = dataStream.partitioner; + this.jobGraphBuilder = dataStream.jobGraphBuilder; + + } + + /** + * Partitioning strategy on the stream. + */ + public static enum ConnectionType { + SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE + } + + /** + * Returns the ID of the {@link DataStream}. + * + * @return ID of the DataStream + */ + public String getId() { + return id; + } + + /** + * Gets the degree of parallelism for this operator. + * + * @return The parallelism set for this operator. + */ + public int getParallelism() { + return this.degreeOfParallelism; + } + + /** + * Creates a new by connecting {@link DataStream} outputs of the same type + * with each other. The DataStreams connected using this operator will be + * transformed simultaneously. + * + * @param streams + * The DataStreams to connect output with. + * @return The {@link ConnectedDataStream}. + */ + public ConnectedDataStream connectWith(DataStream... streams) { + ConnectedDataStream returnStream = new ConnectedDataStream(this); + + for (DataStream stream : streams) { + returnStream.addConnection(stream); + } + return returnStream; + } + + /** + * Creates a new {@link CoDataStream} bye connecting {@link DataStream} + * outputs of different type with each other. The DataStreams connected + * using this operators can be used with CoFunctions. + * + * @param dataStream + * The DataStream with which this stream will be joined. + * @return The {@link CoDataStream}. + */ + public CoDataStream co(DataStream dataStream) { + return new CoDataStream(environment, jobGraphBuilder, this, dataStream); + } + + /** + * Sets the partitioning of the {@link DataStream} so that the output tuples + * are partitioned by their hashcode and are sent to only one component. + * + * @param keyposition + * The field used to compute the hashcode. + * @return The DataStream with field partitioning set. + */ + public DataStream partitionBy(int keyposition) { + if (keyposition < 0) { + throw new IllegalArgumentException("The position of the field must be non-negative"); + } + + return setConnectionType(new FieldsPartitioner(keyposition)); + } + + /** + * Sets the partitioning of the {@link DataStream} so that the output tuples + * are broadcasted to every parallel instance of the next component. + * + * @return The DataStream with broadcast partitioning set. + */ + public DataStream broadcast() { + return setConnectionType(new BroadcastPartitioner()); + } + + /** + * Sets the partitioning of the {@link DataStream} so that the output tuples + * are shuffled to the next component. + * + * @return The DataStream with shuffle partitioning set. + */ + public DataStream shuffle() { + return setConnectionType(new ShufflePartitioner()); + } + + /** + * Sets the partitioning of the {@link DataStream} so that the output tuples + * are forwarded to the local subtask of the next component. This is the + * default partitioner setting. + * + * @return The DataStream with shuffle partitioning set. + */ + public DataStream forward() { + return setConnectionType(new ForwardPartitioner()); + } + + /** + * Sets the partitioning of the {@link DataStream} so that the output tuples + * are distributed evenly to the next component. + * + * @return The DataStream with shuffle partitioning set. + */ + public DataStream distribute() { + return setConnectionType(new DistributePartitioner()); + } + + /** + * Applies a Map transformation on a {@link DataStream}. The transformation + * calls a {@link MapFunction} for each element of the DataStream. Each + * MapFunction call returns exactly one element. The user can also extend + * {@link RichMapFunction} to gain access to other features provided by the + * {@link RichFuntion} interface. + * + * @param mapper + * The MapFunction that is called for each element of the + * DataStream. + * @param + * output type + * @return The transformed {@link DataStream}. + */ + public SingleOutputStreamOperator map(MapFunction mapper) { + return addFunction("map", mapper, new FunctionTypeWrapper(mapper, + MapFunction.class, 0, -1, 1), new MapInvokable(mapper)); + } + + /** + * Applies a FlatMap transformation on a {@link DataStream}. The + * transformation calls a {@link FlatMapFunction} for each element of the + * DataStream. Each FlatMapFunction call can return any number of elements + * including none. The user can also extend {@link RichFlatMapFunction} to + * gain access to other features provided by the {@link RichFuntion} + * interface. + * + * @param flatMapper + * The FlatMapFunction that is called for each element of the + * DataStream + * + * @param + * output type + * @return The transformed {@link DataStream}. + */ + public SingleOutputStreamOperator flatMap(FlatMapFunction flatMapper) { + return addFunction("flatMap", flatMapper, new FunctionTypeWrapper( + flatMapper, FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable( + flatMapper)); + } + + /** + * Applies a reduce transformation on preset chunks of the DataStream. The + * transformation calls a {@link GroupReduceFunction} for each tuple batch + * of the predefined size. Each GroupReduceFunction call can return any + * number of elements including none. The user can also extend + * {@link RichGroupReduceFunction} to gain access to other features provided + * by the {@link RichFuntion} interface. + * + * + * @param reducer + * The GroupReduceFunction that is called for each tuple batch. + * @param batchSize + * The number of tuples grouped together in the batch. + * @param + * output type + * @return The transformed {@link DataStream}. + */ + public SingleOutputStreamOperator batchReduce(GroupReduceFunction reducer, + int batchSize) { + return addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, + GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable(reducer, + batchSize)); + } + + /** + * Applies a reduce transformation on preset "time" chunks of the + * DataStream. The transformation calls a {@link GroupReduceFunction} on + * records received during the predefined time window. The window shifted + * after each reduce call. Each GroupReduceFunction call can return any + * number of elements including none.The user can also extend + * {@link RichGroupReduceFunction} to gain access to other features provided + * by the {@link RichFuntion} interface. + * + * + * @param reducer + * The GroupReduceFunction that is called for each time window. + * @param windowSize + * The time window to run the reducer on, in milliseconds. + * @param + * output type + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator windowReduce(GroupReduceFunction reducer, + long windowSize) { + return addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, + GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable(reducer, + windowSize)); + } + + /** + * Applies a Filter transformation on a {@link DataStream}. The + * transformation calls a {@link FilterFunction} for each element of the + * DataStream and retains only those element for which the function returns + * true. Elements for which the function returns false are filtered. The + * user can also extend {@link RichFilterFunction} to gain access to other + * features provided by the {@link RichFuntion} interface. + * + * @param filter + * The FilterFunction that is called for each element of the + * DataSet. + * @return The filtered DataStream. + */ + public SingleOutputStreamOperator filter(FilterFunction filter) { + return addFunction("filter", filter, new FunctionTypeWrapper(filter, + FilterFunction.class, 0, -1, 0), new FilterInvokable(filter)); + } + + /** + * Writes a DataStream to the standard output stream (stdout). For each + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @return The closed DataStream. + */ + public DataStream print() { + DataStream inputStream = this.copy(); + PrintSinkFunction printFunction = new PrintSinkFunction(); + DataStream returnStream = addSink(inputStream, printFunction, null); + + jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + + return returnStream; + } + + /** + * Writes a DataStream to the file specified by path in text format. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * + * @return The closed DataStream + */ + public DataStream writeAsText(String path) { + return writeAsText(this, path, new WriteFormatAsText(), 1, null); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * @param millis + * is the file update frequency + * + * @return The closed DataStream + */ + public DataStream writeAsText(String path, long millis) { + return writeAsText(this, path, new WriteFormatAsText(), millis, null); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically in equally sized batches. For every + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @param path + * is the path to the location where the tuples are written + * @param batchSize + * is the size of the batches, i.e. the number of tuples written + * to the file at a time + * + * @return The closed DataStream + */ + public DataStream writeAsText(String path, int batchSize) { + return writeAsText(this, path, new WriteFormatAsText(), batchSize, null); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * @param millis + * is the file update frequency + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * system time. + * + * @return The closed DataStream + */ + public DataStream writeAsText(String path, long millis, OUT endTuple) { + return writeAsText(this, path, new WriteFormatAsText(), millis, endTuple); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically in equally sized batches. For every + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @param path + * is the path to the location where the tuples are written + * @param batchSize + * is the size of the batches, i.e. the number of tuples written + * to the file at a time + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * batchSize. + * + * @return The closed DataStream + */ + public DataStream writeAsText(String path, int batchSize, OUT endTuple) { + return writeAsText(this, path, new WriteFormatAsText(), batchSize, endTuple); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * @param millis + * is the file update frequency + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * system time. + * + * @return the data stream constructed + */ + private DataStream writeAsText(DataStream inputStream, String path, + WriteFormatAsText format, long millis, OUT endTuple) { + DataStream returnStream = addSink(inputStream, new WriteSinkFunctionByMillis( + path, format, millis, endTuple), null); + jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + jobGraphBuilder.setMutability(returnStream.getId(), false); + return returnStream; + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically in equally sized batches. For every + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @param path + * is the path to the location where the tuples are written + * @param batchSize + * is the size of the batches, i.e. the number of tuples written + * to the file at a time + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * batchSize. + * + * @return the data stream constructed + */ + private DataStream writeAsText(DataStream inputStream, String path, + WriteFormatAsText format, int batchSize, OUT endTuple) { + DataStream returnStream = addSink(inputStream, new WriteSinkFunctionByBatches( + path, format, batchSize, endTuple), null); + jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + jobGraphBuilder.setMutability(returnStream.getId(), false); + return returnStream; + } + + /** + * Writes a DataStream to the file specified by path in text format. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * + * @return The closed DataStream + */ + public DataStream writeAsCsv(String path) { + return writeAsCsv(this, path, new WriteFormatAsCsv(), 1, null); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * @param millis + * is the file update frequency + * + * @return The closed DataStream + */ + public DataStream writeAsCsv(String path, long millis) { + return writeAsCsv(this, path, new WriteFormatAsCsv(), millis, null); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically in equally sized batches. For every + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @param path + * is the path to the location where the tuples are written + * @param batchSize + * is the size of the batches, i.e. the number of tuples written + * to the file at a time + * + * @return The closed DataStream + */ + public DataStream writeAsCsv(String path, int batchSize) { + return writeAsCsv(this, path, new WriteFormatAsCsv(), batchSize, null); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * @param millis + * is the file update frequency + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * system time. + * + * @return The closed DataStream + */ + public DataStream writeAsCsv(String path, long millis, OUT endTuple) { + return writeAsCsv(this, path, new WriteFormatAsCsv(), millis, endTuple); + } + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically in equally sized batches. For every + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @param path + * is the path to the location where the tuples are written + * @param batchSize + * is the size of the batches, i.e. the number of tuples written + * to the file at a time + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * batchSize. + * + * @return The closed DataStream + */ + public DataStream writeAsCsv(String path, int batchSize, OUT endTuple) { + if (this instanceof SingleOutputStreamOperator) { + ((SingleOutputStreamOperator) this).setMutability(false); + } + return writeAsCsv(this, path, new WriteFormatAsCsv(), batchSize, endTuple); + } + + /** + * Writes a DataStream to the file specified by path in csv format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * is the path to the location where the tuples are written + * @param millis + * is the file update frequency + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * system time. + * + * @return the data stream constructed + */ + private DataStream writeAsCsv(DataStream inputStream, String path, + WriteFormatAsCsv format, long millis, OUT endTuple) { + DataStream returnStream = addSink(inputStream, new WriteSinkFunctionByMillis( + path, format, millis, endTuple)); + jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + jobGraphBuilder.setMutability(returnStream.getId(), false); + return returnStream; + } + + /** + * Writes a DataStream to the file specified by path in csv format. The + * writing is performed periodically in equally sized batches. For every + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @param path + * is the path to the location where the tuples are written + * @param batchSize + * is the size of the batches, i.e. the number of tuples written + * to the file at a time + * @param endTuple + * is a special tuple indicating the end of the stream. If an + * endTuple is caught, the last pending batch of tuples will be + * immediately appended to the target file regardless of the + * batchSize. + * + * @return the data stream constructed + */ + private DataStream writeAsCsv(DataStream inputStream, String path, + WriteFormatAsCsv format, int batchSize, OUT endTuple) { + DataStream returnStream = addSink(inputStream, new WriteSinkFunctionByBatches( + path, format, batchSize, endTuple), null); + jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + jobGraphBuilder.setMutability(returnStream.getId(), false); + return returnStream; + } + + /** + * Initiates an iterative part of the program that executes multiple times + * and feeds back data streams. The iterative part needs to be closed by + * calling {@link IterativeDataStream#closeWith(DataStream)}. The + * transformation of this IterativeDataStream will be the iteration head. + * The data stream given to the {@code closeWith(DataStream)} method is the + * data stream that will be fed back and used as the input for the iteration + * head. Unlike in batch processing by default the output of the iteration + * stream is directed to both to the iteration head and the next component. + * To direct tuples to the iteration head or the output specifically one can + * use the {@code split(OutputSelector)} on the iteration tail while + * referencing the iteration head as 'iterate'. + * + * The iteration edge will be partitioned the same way as the first input of + * the iteration head. + * + * @return The iterative data stream created. + */ + public IterativeDataStream iterate() { + return new IterativeDataStream(this); + } + + protected DataStream addIterationSource(String iterationID) { + + DataStream returnStream = new DataStreamSource(environment, "iterationSource"); + + jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID, + degreeOfParallelism); + + return this.copy(); + } + + /** + * Internal function for passing the user defined functions to the JobGraph + * of the job. + * + * @param functionName + * name of the function + * @param function + * the user defined function + * @param functionInvokable + * the wrapping JobVertex instance + * @param + * type of the return stream + * @return the data stream constructed + */ + private SingleOutputStreamOperator addFunction(String functionName, + final Function function, TypeSerializerWrapper typeWrapper, + UserTaskInvokable functionInvokable) { + + DataStream inputStream = this.copy(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, + functionName); + + try { + jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper, + functionName, SerializationUtils.serialize((Serializable) function), + degreeOfParallelism); + } catch (SerializationException e) { + throw new RuntimeException("Cannot serialize user defined function"); + } + + connectGraph(inputStream, returnStream.getId(), 0); + + if (inputStream instanceof IterativeDataStream) { + returnStream.addIterationSource(((IterativeDataStream) inputStream).iterationID + .toString()); + } + + if (userDefinedName != null) { + returnStream.name(getUserDefinedNames()); + } + + return returnStream; + } + + protected List getUserDefinedNames() { + List nameList = new ArrayList(); + nameList.add(userDefinedName); + return nameList; + } + + /** + * Gives the data transformation(vertex) a user defined name in order to use + * with directed outputs. The {@link OutputSelector} of the input vertex + * should use this name for directed emits. + * + * @param name + * The name to set + * @return The named DataStream. + */ + protected DataStream name(List name) { + + userDefinedName = name.get(0); + jobGraphBuilder.setUserDefinedName(id, name); + + return this; + } + + /** + * Internal function for setting the partitioner for the DataStream + * + * @param partitioner + * Partitioner to set. + * @return The modified DataStream. + */ + protected DataStream setConnectionType(StreamPartitioner partitioner) { + DataStream returnStream = this.copy(); + + returnStream.partitioner = partitioner; + + return returnStream; + } + + /** + * Internal function for assembling the underlying + * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects + * the outputs of the given input stream to the specified output stream + * given by the outputID. + * + * @param inputStream + * input data stream + * @param outputID + * ID of the output + * @param typeNumber + * Number of the type (used at co-functions) + */ + protected void connectGraph(DataStream inputStream, String outputID, int typeNumber) { + if (inputStream instanceof ConnectedDataStream) { + for (DataStream stream : ((ConnectedDataStream) inputStream).connectedStreams) { + jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber); + } + } else { + jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner, + typeNumber); + } + + } + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the {@link StreamExecutionEnvironment#execute()} + * method is called. + * + * @param sinkFunction + * The object containing the sink's invoke function. + * @return The closed DataStream. + */ + public DataStream addSink(SinkFunction sinkFunction) { + return addSink(this.copy(), sinkFunction); + } + + private DataStream addSink(DataStream inputStream, SinkFunction sinkFunction) { + return addSink(inputStream, sinkFunction, new FunctionTypeWrapper( + sinkFunction, SinkFunction.class, 0, -1, 0)); + } + + private DataStream addSink(DataStream inputStream, SinkFunction sinkFunction, + TypeSerializerWrapper typeWrapper) { + DataStream returnStream = new DataStreamSink(environment, "sink"); + + try { + jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable(sinkFunction), + typeWrapper, "sink", SerializationUtils.serialize(sinkFunction), + degreeOfParallelism); + } catch (SerializationException e) { + throw new RuntimeException("Cannot serialize SinkFunction"); + } + + inputStream.connectGraph(inputStream, returnStream.getId(), 0); + + if (this.copy().userDefinedName != null) { + returnStream.name(getUserDefinedNames()); + } + + return returnStream; + } + + /** + * Creates a copy of the {@link DataStream} + * + * @return The copy + */ + protected abstract DataStream copy(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java new file mode 100755 index 0000000..ee6502f --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Represents the end of a DataStream. + * + * @param + * The type of the DataStream closed by the sink. + */ +public class DataStreamSink extends DataStream { + + protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) { + super(environment, operatorType); + } + + protected DataStreamSink(DataStream dataStream) { + super(dataStream); + } + + @Override + protected DataStream copy() { + throw new RuntimeException("Data stream sinks cannot be copied"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java new file mode 100755 index 0000000..f939851 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * The DataStreamSource represents the starting point of a DataStream. + * + * @param + * Type of the DataStream created. + */ +public class DataStreamSource extends SingleOutputStreamOperator> { + + public DataStreamSource(StreamExecutionEnvironment environment, String operatorType) { + super(environment, operatorType); + } + + public DataStreamSource(DataStream dataStream) { + super(dataStream); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java new file mode 100644 index 0000000..b9aadcd --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.partitioner.ForwardPartitioner; + +/** + * The iterative data stream represents the start of an iteration in a + * {@link DataStream}. + * + * @param + * Type of the DataStream + */ +public class IterativeDataStream extends SingleOutputStreamOperator> { + + static Integer iterationCount = 0; + protected Integer iterationID; + + protected IterativeDataStream(DataStream dataStream) { + super(dataStream); + iterationID = iterationCount; + iterationCount++; + } + + protected IterativeDataStream(DataStream dataStream, Integer iterationID) { + super(dataStream); + this.iterationID = iterationID; + } + + /** + * Closes the iteration. This method defines the end of the iterative + * program part. By default the DataStream represented by the parameter will + * be fed back to the iteration head, however the user can explicitly select + * which tuples should be iterated by {@code directTo(OutputSelector)}. + * Tuples directed to 'iterate' will be fed back to the iteration head. + * + * @param iterationResult + * The data stream that can be fed back to the next iteration. + * + */ + public DataStream closeWith(DataStream iterationResult) { + return closeWith(iterationResult, null); + } + + /** + * Closes the iteration. This method defines the end of the iterative + * program part. By default the DataStream represented by the parameter will + * be fed back to the iteration head, however the user can explicitly select + * which tuples should be iterated by {@code directTo(OutputSelector)}. + * Tuples directed to 'iterate' will be fed back to the iteration head. + * + * @param iterationTail + * The data stream that can be fed back to the next iteration. + * @param iterationName + * Name of the iteration edge (backward edge to iteration head) + * when used with directed emits + * + */ + public DataStream closeWith(DataStream iterationTail, String iterationName) { + DataStream returnStream = new DataStreamSink(environment, "iterationSink"); + + jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(), + iterationID.toString(), iterationTail.getParallelism(), iterationName); + + jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(), + iterationTail.getParallelism()); + + if (iterationTail instanceof ConnectedDataStream) { + for (DataStream stream : ((ConnectedDataStream) iterationTail).connectedStreams) { + String inputID = stream.getId(); + jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner(), + 0); + } + } else { + jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(), + new ForwardPartitioner(), 0); + } + + return iterationTail; + } + + @Override + protected IterativeDataStream copy() { + return new IterativeDataStream(this, iterationID); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java new file mode 100755 index 0000000..9af4dc8 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -0,0 +1,148 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.commons.lang3.SerializationException; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * The SingleOutputStreamOperator represents a user defined transformation + * applied on a {@link DataStream} with one predefined output type. + * + * @param + * Output type of the operator. + * @param + * Type of the operator. + */ +public class SingleOutputStreamOperator> extends + DataStream { + + protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, String operatorType) { + super(environment, operatorType); + } + + protected SingleOutputStreamOperator(DataStream dataStream) { + super(dataStream); + } + + /** + * Sets the degree of parallelism for this operator. The degree must be 1 or + * more. + * + * @param dop + * The degree of parallelism for this operator. + * @return The operator with set degree of parallelism. + */ + public SingleOutputStreamOperator setParallelism(int dop) { + if (dop < 1) { + throw new IllegalArgumentException("The parallelism of an operator must be at least 1."); + } + this.degreeOfParallelism = dop; + + jobGraphBuilder.setParallelism(id, degreeOfParallelism); + + return this; + } + + /** + * Sets the mutability of the operator. If the operator is set to mutable, + * the tuples received in the user defined functions, will be reused after + * the function call. Setting an operator to mutable reduces garbage + * collection overhead and thus increases scalability. Please note that if a + * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used + * as mutable, the user can only iterate through the iterator once in every + * invoke. + * + * @param isMutable + * The mutability of the operator. + * @return The operator with mutability set. + */ + public DataStream setMutability(boolean isMutable) { + jobGraphBuilder.setMutability(id, isMutable); + return this; + } + + /** + * Sets the maximum time frequency (ms) for the flushing of the output + * buffer. By default the output buffers flush only when they are full. + * + * @param timeoutMillis + * The maximum time between two output flushes. + * @return The operator with buffer timeout set. + */ + public DataStream setBufferTimeout(long timeoutMillis) { + jobGraphBuilder.setBufferTimeout(id, timeoutMillis); + return this; + } + + /** + * Operator used for directing tuples to specific named outputs using an + * {@link OutputSelector}. Calling this method on an operator creates a new + * {@link SplitDataStream}. + * + * @param outputSelector + * The user defined {@link OutputSelector} for directing the + * tuples. + * @return The {@link SplitDataStream} + */ + public SplitDataStream split(OutputSelector outputSelector) { + try { + jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector)); + + } catch (SerializationException e) { + throw new RuntimeException("Cannot serialize OutputSelector"); + } + + return new SplitDataStream(this); + } + + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator partitionBy(int keyposition) { + return (SingleOutputStreamOperator) super.partitionBy(keyposition); + } + + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator broadcast() { + return (SingleOutputStreamOperator) super.broadcast(); + } + + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator shuffle() { + return (SingleOutputStreamOperator) super.shuffle(); + } + + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator forward() { + return (SingleOutputStreamOperator) super.forward(); + } + + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator distribute() { + return (SingleOutputStreamOperator) super.distribute(); + } + + @Override + protected SingleOutputStreamOperator copy() { + return new SingleOutputStreamOperator(this); + } + +}