Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 873AE180B5 for ; Wed, 17 Feb 2016 09:59:45 +0000 (UTC) Received: (qmail 98669 invoked by uid 500); 17 Feb 2016 09:59:39 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 98604 invoked by uid 500); 17 Feb 2016 09:59:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 98526 invoked by uid 99); 17 Feb 2016 09:59:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Feb 2016 09:59:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F231CE1120; Wed, 17 Feb 2016 09:59:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Wed, 17 Feb 2016 09:59:40 -0000 Message-Id: <6965808a17d04905a6d7bce3f7b4c8e7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/7] flink git commit: [FLINK-3402] Refactor Common Parts of Stream/Batch Documentation http://git-wip-us.apache.org/repos/asf/flink/blob/be4601ea/docs/apis/streaming/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index 5cfde85..08843bc 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -3,7 +3,7 @@ title: "Flink DataStream API Programming Guide" # Top-level navigation top-nav-group: apis -top-nav-pos: 1 +top-nav-pos: 2 top-nav-title: Streaming Guide (DataStream API) # Sub-level navigation @@ -38,9 +38,13 @@ example write the data to files, or to standard output (for example the command terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines. -In order to create your own Flink DataStream program, we encourage you to start with the -[program skeleton](#program-skeleton) and gradually add your own -[transformations](#transformations). The remaining sections act as references for additional +Please see [basic concepts]({{ site.baseurl }}/apis/common/index.html) for an introduction +to the basic concepts of the Flink API. + +In order to create your own Flink DataStream program, we encourage you to start with +[anatomy of a Flink Program]({{ site.baseurl }}/apis/common/index.html#anatomy-of-a-flink-program) +and gradually add your own +[transformations](#datastream-transformations). The remaining sections act as references for additional operations and advanced features. @@ -137,303 +141,8 @@ word count program. If you want to see counts greater than 1, type the same word {% top %} - -Linking with Flink ------------------- - -To write programs with Flink, you need to include the Flink DataStream library corresponding to -your programming language in your project. - -The simplest way to do this is to use one of the quickstart scripts: either for -[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They -create a blank project from a template (a Maven Archetype), which sets up everything for you. To -manually create the project, you can use the archetype and create a project by calling: - -
-
-{% highlight bash %} -mvn archetype:generate / - -DarchetypeGroupId=org.apache.flink/ - -DarchetypeArtifactId=flink-quickstart-java / - -DarchetypeVersion={{site.version }} -{% endhighlight %} -
-
-{% highlight bash %} -mvn archetype:generate / - -DarchetypeGroupId=org.apache.flink/ - -DarchetypeArtifactId=flink-quickstart-scala / - -DarchetypeVersion={{site.version }} -{% endhighlight %} -
-
- -The archetypes are working for stable releases and preview versions (`-SNAPSHOT`). - -If you want to add Flink to an existing Maven project, add the following entry to your -*dependencies* section in the *pom.xml* file of your project: - -
-
-{% highlight xml %} - - org.apache.flink - flink-streaming-java{{ site.scala_version_suffix }} - {{site.version }} - - - org.apache.flink - flink-clients{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} -
-
-{% highlight xml %} - - org.apache.flink - flink-streaming-scala{{ site.scala_version_suffix }} - {{site.version }} - - - org.apache.flink - flink-clients{{ site.scala_version_suffix }} - {{site.version }} - -{% endhighlight %} -
-
- -In order to create your own Flink program, we encourage you to start with the -[program skeleton](#program-skeleton) and gradually add your own -[transformations](#transformations). - -{% top %} - -Program Skeleton ----------------- - -
-
- -
- -As presented in the [example](#example-program), Flink DataStream programs look like regular Java -programs with a `main()` method. Each program consists of the same basic parts: - -1. Obtaining a `StreamExecutionEnvironment`, -2. Connecting to data stream sources, -3. Specifying transformations on the data streams, -4. Specifying output for the processed data, -5. Executing the program. - -We will now give an overview of each of those steps, please refer to the respective sections for -more details. - -The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can -obtain one using these static methods on class `StreamExecutionEnvironment`: - -{% highlight java %} -getExecutionEnvironment() - -createLocalEnvironment() -createLocalEnvironment(int parallelism) -createLocalEnvironment(int parallelism, Configuration customConfiguration) - -createRemoteEnvironment(String host, int port, String... jarFiles) -createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) -{% endhighlight %} - -Typically, you only need to use `getExecutionEnvironment()`, since this -will do the right thing depending on the context: if you are executing -your program inside an IDE or as a regular Java program it will create -a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the [command line]({{ site.baseurl }}/apis/cli.html), -the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return -an execution environment for executing your program on a cluster. - -For specifying data sources the execution environment has several methods -to read from files, sockets, and external systems using various methods. To just read -data from a socket (useful also for debugging), you can use: - -{% highlight java %} -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - -DataStream lines = env.socketTextStream("localhost", 9999) -{% endhighlight %} - -This will give you a DataStream on which you can then apply transformations. For -more information on data sources and input formats, please refer to -[Data Sources](#data-sources). - -Once you have a DataStream you can apply transformations to create a new -DataStream which you can then write to a socket, transform again, -combine with other DataStreams, or push to an external system (e.g., a message queue, or a file system). -You apply transformations by calling -methods on DataStream with your own custom transformation functions. For example, -a map transformation looks like this: - -{% highlight java %} -DataStream input = ...; - -DataStream intValues = input.map(new MapFunction() { - @Override - public Integer map(String value) { - return Integer.parseInt(value); - } -}); -{% endhighlight %} - -This will create a new DataStream by converting every String in the original -stream to an Integer. For more information and a list of all the transformations, -please refer to [Transformations](#transformations). - -Once you have a DataStream containing your final results, you can push the result -to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, -or print it. - -{% highlight java %} -writeAsText(String path, ...) -writeAsCsv(String path, ...) -writeToSocket(String hostname, int port, ...) - -print() - -addSink(...) -{% endhighlight %} - -Once you specified the complete program you need to **trigger the program execution** by -calling `execute()` on `StreamExecutionEnvironment`. This will either execute on -the local machine or submit the program for execution on a cluster, depending on the chosen execution environment. - -{% highlight java %} -env.execute(); -{% endhighlight %} - -
-
- -
- -As presented in the [example](#example-program), Flink DataStream programs look like regular Scala -programs with a `main()` method. Each program consists of the same basic parts: - -1. Obtaining a `StreamExecutionEnvironment`, -2. Connecting to data stream sources, -3. Specifying transformations on the data streams, -4. Specifying output for the processed data, -5. Executing the program. - -We will now give an overview of each of those steps, please refer to the respective sections for -more details. - -The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can -obtain one using these static methods on class `StreamExecutionEnvironment`: - -{% highlight scala %} -def getExecutionEnvironment - -def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()) - -def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) -def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*) -{% endhighlight %} - -Typically, you only need to use `getExecutionEnvironment`, since this -will do the right thing depending on the context: if you are executing -your program inside an IDE or as a regular Java program it will create -a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the [command line](cli.html) -or the [web interface](web_client.html), -the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return -an execution environment for executing your program on a cluster. - -For specifying data sources the execution environment has several methods -to read from files, sockets, and external systems using various methods. To just read -data from a socket (useful also for debugging), you can use: - -{% highlight scala %} -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment - -DataStream lines = env.socketTextStream("localhost", 9999) -{% endhighlight %} - -This will give you a DataStream on which you can then apply transformations. For -more information on data sources and input formats, please refer to -[Data Sources](#data-sources). - -Once you have a DataStream you can apply transformations to create a new -DataStream which you can then write to a file, transform again, -combine with other DataStreams, or push to an external system. -You apply transformations by calling -methods on DataStream with your own custom transformation function. For example, -a map transformation looks like this: - -{% highlight scala %} -val input: DataStream[String] = ... - -val mapped = input.map { x => x.toInt } -{% endhighlight %} - -This will create a new DataStream by converting every String in the original -set to an Integer. For more information and a list of all the transformations, -please refer to [Transformations](#transformations). - -Once you have a DataStream containing your final results, you can push the result -to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, -or print it. - -{% highlight scala %} -writeAsText(path: String, ...) -writeAsCsv(path: String, ...) -writeToSocket(hostname: String, port: Int, ...) - -print() - -addSink(...) -{% endhighlight %} - -Once you specified the complete program you need to **trigger the program execution** by -calling `execute` on `StreamExecutionEnvironment`. This will either execute on -the local machine or submit the program for execution on a cluster, depending on the chosen execution environment. - -{% highlight scala %} -env.execute() -{% endhighlight %} - -
-
- -{% top %} - -DataStream Abstraction ----------------------- - -A `DataStream` is a possibly unbounded immutable collection of data items of a the same type. - -Transformations may return different subtypes of `DataStream` allowing specialized transformations. -For example the `keyBy(…)` method returns a `KeyedDataStream` which is a stream of data that -is logically partitioned by a certain key, and can be further windowed. - -{% top %} - -Lazy Evaluation ---------------- - -All Flink DataStream programs are executed lazily: When the program's main method is executed, the data loading -and transformations do not happen directly. Rather, each operation is created and added to the -program's plan. The operations are actually executed when the execution is explicitly triggered by -an `execute()` call on the `StreamExecutionEnvironment` object. Whether the program is executed locally -or on a cluster depends on the type of `StreamExecutionEnvironment`. - -The lazy evaluation lets you construct sophisticated programs that Flink executes as one -holistically planned unit. - -{% top %} - - -Transformations ---------------- +DataStream Transformations +-------------------------- Data transformations transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated topologies. @@ -836,7 +545,7 @@ DataStream output = iterationBody.filter(new FilterFunction(){

Extracts timestamps from records in order to work with windows - that use event time semantics. See working with time. + that use event time semantics. See working with time. {% highlight java %} stream.assignTimestamps (new TimeStampExtractor() {...}); {% endhighlight %} @@ -1158,7 +867,7 @@ iteration.closeWith(feedback);

Extracts timestamps from records in order to work with windows that use event time semantics. - See working with time. + See working with time. {% highlight scala %} stream.assignTimestamps { timestampExtractor } {% endhighlight %} @@ -1619,53 +1328,6 @@ someStream.map(...).isolateResources() {% top %} -Specifying Keys ----------------- - -The `keyBy` transformation requires that a key is defined on -its argument DataStream. - -A DataStream is keyed as -{% highlight java %} -DataStream<...> input = // [...] -DataStream<...> windowed = input - .keyBy(/*define key here*/) - .window(/*define window here*/); -{% endhighlight %} - -The data model of Flink is not based on key-value pairs. Therefore, -you do not need to physically pack the data stream types into keys and -values. Keys are "virtual": they are defined as functions over the -actual data to guide the grouping operator. - -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#specifying-keys) on how to specify keys. -Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`. - - - -Passing Functions to Flink --------------------------- - -Some transformations take user-defined functions as arguments. - -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#passing-functions-to-flink). - - -{% top %} - - -Data Types ----------- - -Flink places some restrictions on the type of elements that are used in DataStreams and in results -of transformations. The reason for this is that the system analyzes the types to determine -efficient execution strategies. - -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#data-types). - -{% top %} - - Data Sources ------------ @@ -1767,24 +1429,6 @@ Custom: {% top %} - -Execution Configuration ----------- - -The `StreamExecutionEnvironment` also contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime. - -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#execution-configuration). - -Parameters in the `ExecutionConfig` that pertain specifically to the DataStream API are: - -- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted from a source. - `areTimestampsEnabled()` returns the current value. - -- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark emission. You can - get the current value with `long getAutoWatermarkInterval()` - -{% top %} - Data Sinks ---------- @@ -1815,7 +1459,7 @@ greater than 1, the output will also be prepended with the identifier of the tas - `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions. - +

@@ -1848,139 +1492,17 @@ greater than 1, the output will also be prepended with the identifier of the tas
Note that the `write*()` methods on `DataStream` are mainly intended for debugging purposes. -They are not participating in Flink's checkpointing, this means these functions usually have -at-least-once semantics. The data flushing to the target system depends on the implementation of the -OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up +They are not participating in Flink's checkpointing, this means these functions usually have +at-least-once semantics. The data flushing to the target system depends on the implementation of the +OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost. For reliable, exactly-once delivery of a stream into a file system, use the `flink-connector-filesystem`. Also, custom implementations through the `.addSink(...)` method can partiticpate in Flink's checkpointing for exactly-once semantics. - - {% top %} -Debugging ---------- - -Before running a streaming program in a distributed cluster, it is a good -idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis -programs is usually an incremental process of checking results, debugging, and improving. - -Flink provides features to significantly ease the development process of data analysis -programs by supporting local debugging from within an IDE, injection of test data, and collection of -result data. This section give some hints how to ease the development of Flink programs. - -### Local Execution Environment - -A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created in. If you -start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your -program. - -A LocalEnvironment is created and used as follows: - -
-
-{% highlight java %} -final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - -DataStream lines = env.addSource(/* some source */); -// build your program - -env.execute(); -{% endhighlight %} -
-
- -{% highlight scala %} -val env = StreamExecutionEnvironment.createLocalEnvironment() - -val lines = env.addSource(/* some source */) -// build your program - -env.execute() -{% endhighlight %} -
-
- -### Collection Data Sources - -Flink provides special data sources which are backed -by Java collections to ease testing. Once a program has been tested, the sources and sinks can be -easily replaced by sources and sinks that read from / write to external systems. - -Collection data sources can be used as follows: - -
-
-{% highlight java %} -final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - -// Create a DataStream from a list of elements -DataStream myInts = env.fromElements(1, 2, 3, 4, 5); - -// Create a DataStream from any Java collection -List> data = ... -DataStream> myTuples = env.fromCollection(data); - -// Create a DataStream from an Iterator -Iterator longIt = ... -DataStream myLongs = env.fromCollection(longIt, Long.class); -{% endhighlight %} -
-
-{% highlight scala %} -val env = StreamExecutionEnvironment.createLocalEnvironment() - -// Create a DataStream from a list of elements -val myInts = env.fromElements(1, 2, 3, 4, 5) - -// Create a DataStream from any Collection -val data: Seq[(String, Int)] = ... -val myTuples = env.fromCollection(data) - -// Create a DataStream from an Iterator -val longIt: Iterator[Long] = ... -val myLongs = env.fromCollection(longIt) -{% endhighlight %} -
-
- -**Note:** Currently, the collection data source requires that data types and iterators implement -`Serializable`. Furthermore, collection data sources can not be executed in parallel ( -parallelism = 1). - -### Iterator Data Sink - -Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows: - -
-
-{% highlight java %} -import org.apache.flink.contrib.streaming.DataStreamUtils - -DataStream> myResult = ... -Iterator> myOutput = DataStreamUtils.collect(myResult) -{% endhighlight %} - -
-
- -{% highlight scala %} -import org.apache.flink.contrib.streaming.DataStreamUtils -import scala.collection.JavaConverters.asScalaIteratorConverter - -val myResult: DataStream[(String, Int)] = ... -val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala -{% endhighlight %} -
-
- - -{% top %} - - Windows ------- @@ -2014,7 +1536,7 @@ to defining your own windows.

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). + The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight java %} keyedStream.timeWindow(Time.seconds(5)); {% endhighlight %} @@ -2028,7 +1550,7 @@ keyedStream.timeWindow(Time.seconds(5)); Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). + The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight java %} keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)); {% endhighlight %} @@ -2084,7 +1606,7 @@ keyedStream.countWindow(1000, 100)

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). + The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight scala %} keyedStream.timeWindow(Time.seconds(5)) {% endhighlight %} @@ -2098,7 +1620,7 @@ keyedStream.timeWindow(Time.seconds(5)) Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). + The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight scala %} keyedStream.timeWindow(Time.seconds(5), Time.seconds(1)) {% endhighlight %} @@ -2953,54 +2475,6 @@ nonKeyedStream.countWindowAll(1000, 100) {% top %} -Execution Parameters --------------------- - -### Fault Tolerance - -The [Fault Tolerance Documentation](fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism. - -### Parallelism - -You can control the number of parallel instances created for each operator by -calling the `operator.setParallelism(int)` method. - -### Controlling Latency - -By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) -but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. -While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. -To control throughput and latency, you can use `env.setBufferTimeout(timeoutMillis)` on the execution environment -(or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the -buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms. - -Usage: - -

-
-{% highlight java %} -LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); -env.setBufferTimeout(timeoutMillis); - -env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); -{% endhighlight %} -
-
-{% highlight scala %} -LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment -env.setBufferTimeout(timeoutMillis) - -env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) -{% endhighlight %} -
-
- -To maximize throughput, set `setBufferTimeout(-1)` which will remove the timeout and buffers will only be -flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). -A buffer timeout of 0 should be avoided, because it can cause severe performance degradation. - -{% top %} - Working with State ------------------ @@ -3290,23 +2764,176 @@ val iteratedStream = someIntegers.iterate( {% top %} -Program Packaging & Distributed Execution ------------------------------------------ +Execution Parameters +-------------------- + +The `StreamExecutionEnvironment` contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime. + +Please refer to [execution configuration]({{ site.baseurl }}/apis/common/index.html#execution-configuration) +for an explanation of most parameters. These parameters pertain specifically to the DataStream API: + +- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted from a source. + `areTimestampsEnabled()` returns the current value. -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#program-packaging-and-distributed-execution). +- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark emission. You can + get the current value with `long getAutoWatermarkInterval()` {% top %} -Parallel Execution ------------------- +### Fault Tolerance + +The [Fault Tolerance Documentation](fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism. + +### Controlling Latency -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#parallel-execution). +By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) +but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. +While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. +To control throughput and latency, you can use `env.setBufferTimeout(timeoutMillis)` on the execution environment +(or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the +buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms. + +Usage: + +
+
+{% highlight java %} +LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +env.setBufferTimeout(timeoutMillis); + +env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); +{% endhighlight %} +
+
+{% highlight scala %} +LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment +env.setBufferTimeout(timeoutMillis) + +env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) +{% endhighlight %} +
+
+ +To maximize throughput, set `setBufferTimeout(-1)` which will remove the timeout and buffers will only be +flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). +A buffer timeout of 0 should be avoided, because it can cause severe performance degradation. {% top %} -Execution Plans ---------------- +Debugging +--------- + +Before running a streaming program in a distributed cluster, it is a good +idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis +programs is usually an incremental process of checking results, debugging, and improving. + +Flink provides features to significantly ease the development process of data analysis +programs by supporting local debugging from within an IDE, injection of test data, and collection of +result data. This section give some hints how to ease the development of Flink programs. + +### Local Execution Environment + +A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created in. If you +start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your +program. + +A LocalEnvironment is created and used as follows: + +
+
+{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + +DataStream lines = env.addSource(/* some source */); +// build your program + +env.execute(); +{% endhighlight %} +
+
+ +{% highlight scala %} +val env = StreamExecutionEnvironment.createLocalEnvironment() -See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#execution-plans). +val lines = env.addSource(/* some source */) +// build your program + +env.execute() +{% endhighlight %} +
+
+ +### Collection Data Sources + +Flink provides special data sources which are backed +by Java collections to ease testing. Once a program has been tested, the sources and sinks can be +easily replaced by sources and sinks that read from / write to external systems. + +Collection data sources can be used as follows: + +
+
+{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + +// Create a DataStream from a list of elements +DataStream myInts = env.fromElements(1, 2, 3, 4, 5); + +// Create a DataStream from any Java collection +List> data = ... +DataStream> myTuples = env.fromCollection(data); + +// Create a DataStream from an Iterator +Iterator longIt = ... +DataStream myLongs = env.fromCollection(longIt, Long.class); +{% endhighlight %} +
+
+{% highlight scala %} +val env = StreamExecutionEnvironment.createLocalEnvironment() + +// Create a DataStream from a list of elements +val myInts = env.fromElements(1, 2, 3, 4, 5) + +// Create a DataStream from any Collection +val data: Seq[(String, Int)] = ... +val myTuples = env.fromCollection(data) + +// Create a DataStream from an Iterator +val longIt: Iterator[Long] = ... +val myLongs = env.fromCollection(longIt) +{% endhighlight %} +
+
+ +**Note:** Currently, the collection data source requires that data types and iterators implement +`Serializable`. Furthermore, collection data sources can not be executed in parallel ( +parallelism = 1). + +### Iterator Data Sink + +Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows: + +
+
+{% highlight java %} +import org.apache.flink.contrib.streaming.DataStreamUtils + +DataStream> myResult = ... +Iterator> myOutput = DataStreamUtils.collect(myResult) +{% endhighlight %} + +
+
+ +{% highlight scala %} +import org.apache.flink.contrib.streaming.DataStreamUtils +import scala.collection.JavaConverters.asScalaIteratorConverter + +val myResult: DataStream[(String, Int)] = ... +val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala +{% endhighlight %} +
+
{% top %}