flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/7] flink git commit: [FLINK-3402] Refactor Common Parts of Stream/Batch Documentation
Date Wed, 17 Feb 2016 09:59:40 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/be4601ea/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index 5cfde85..08843bc 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -3,7 +3,7 @@ title: "Flink DataStream API Programming Guide"
 
 # Top-level navigation
 top-nav-group: apis
-top-nav-pos: 1
+top-nav-pos: 2
 top-nav-title: <strong>Streaming Guide</strong> (DataStream API)
 
 # Sub-level navigation
@@ -38,9 +38,13 @@ example write the data to files, or to standard output (for example the
command
 terminal). Flink programs run in a variety of contexts, standalone, or embedded in other
programs.
 The execution can happen in a local JVM, or on clusters of many machines.
 
-In order to create your own Flink DataStream program, we encourage you to start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations). The remaining sections act as references for additional
+Please see [basic concepts]({{ site.baseurl }}/apis/common/index.html) for an introduction
+to the basic concepts of the Flink API.
+
+In order to create your own Flink DataStream program, we encourage you to start with
+[anatomy of a Flink Program]({{ site.baseurl }}/apis/common/index.html#anatomy-of-a-flink-program)
+and gradually add your own
+[transformations](#datastream-transformations). The remaining sections act as references
for additional
 operations and advanced features.
 
 
@@ -137,303 +141,8 @@ word count program. If you want to see counts greater than 1, type the
same word
 
 {% top %}
 
-
-Linking with Flink
-------------------
-
-To write programs with Flink, you need to include the Flink DataStream library corresponding
to
-your programming language in your project.
-
-The simplest way to do this is to use one of the quickstart scripts: either for
-[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl
}}/quickstart/scala_api_quickstart.html). They
-create a blank project from a template (a Maven Archetype), which sets up everything for
you. To
-manually create the project, you can use the archetype and create a project by calling:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-mvn archetype:generate /
-    -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=flink-quickstart-java /
-    -DarchetypeVersion={{site.version }}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-mvn archetype:generate /
-    -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=flink-quickstart-scala /
-    -DarchetypeVersion={{site.version }}
-{% endhighlight %}
-</div>
-</div>
-
-The archetypes are working for stable releases and preview versions (`-SNAPSHOT`).
-
-If you want to add Flink to an existing Maven project, add the following entry to your
-*dependencies* section in the *pom.xml* file of your project:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-scala{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-</div>
-
-In order to create your own Flink program, we encourage you to start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations).
-
-{% top %}
-
-Program Skeleton
-----------------
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-As presented in the [example](#example-program), Flink DataStream programs look like regular
Java
-programs with a `main()` method. Each program consists of the same basic parts:
-
-1. Obtaining a `StreamExecutionEnvironment`,
-2. Connecting to data stream sources,
-3. Specifying transformations on the data streams,
-4. Specifying output for the processed data,
-5. Executing the program.
-
-We will now give an overview of each of those steps, please refer to the respective sections
for
-more details.
-
-The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can
-obtain one using these static methods on class `StreamExecutionEnvironment`:
-
-{% highlight java %}
-getExecutionEnvironment()
-
-createLocalEnvironment()
-createLocalEnvironment(int parallelism)
-createLocalEnvironment(int parallelism, Configuration customConfiguration)
-
-createRemoteEnvironment(String host, int port, String... jarFiles)
-createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
-{% endhighlight %}
-
-Typically, you only need to use `getExecutionEnvironment()`, since this
-will do the right thing depending on the context: if you are executing
-your program inside an IDE or as a regular Java program it will create
-a local environment that will execute your program on your local machine. If
-you created a JAR file from your program, and invoke it through the [command line]({{ site.baseurl
}}/apis/cli.html),
-the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will
return
-an execution environment for executing your program on a cluster.
-
-For specifying data sources the execution environment has several methods
-to read from files, sockets, and external systems using various methods. To just read
-data from a socket (useful also for debugging), you can use:
-
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-DataStream<String> lines = env.socketTextStream("localhost", 9999)
-{% endhighlight %}
-
-This will give you a DataStream on which you can then apply transformations. For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataStream you can apply transformations to create a new
-DataStream which you can then write to a socket, transform again,
-combine with other DataStreams, or push to an external system (e.g., a message queue, or
a file system).
-You apply transformations by calling
-methods on DataStream with your own custom transformation functions. For example,
-a map transformation looks like this:
-
-{% highlight java %}
-DataStream<String> input = ...;
-
-DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>()
{
-    @Override
-    public Integer map(String value) {
-        return Integer.parseInt(value);
-    }
-});
-{% endhighlight %}
-
-This will create a new DataStream by converting every String in the original
-stream to an Integer. For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataStream containing your final results, you can push the result
-to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file,
-or print it.
-
-{% highlight java %}
-writeAsText(String path, ...)
-writeAsCsv(String path, ...)
-writeToSocket(String hostname, int port, ...)
-
-print()
-
-addSink(...)
-{% endhighlight %}
-
-Once you specified the complete program you need to **trigger the program execution** by
-calling `execute()` on `StreamExecutionEnvironment`. This will either execute on
-the local machine or submit the program for execution on a cluster, depending on the chosen
execution environment.
-
-{% highlight java %}
-env.execute();
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-<br />
-
-As presented in the [example](#example-program), Flink DataStream programs look like regular
Scala
-programs with a `main()` method. Each program consists of the same basic parts:
-
-1. Obtaining a `StreamExecutionEnvironment`,
-2. Connecting to data stream sources,
-3. Specifying transformations on the data streams,
-4. Specifying output for the processed data,
-5. Executing the program.
-
-We will now give an overview of each of those steps, please refer to the respective sections
for
-more details.
-
-The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can
-obtain one using these static methods on class `StreamExecutionEnvironment`:
-
-{% highlight scala %}
-def getExecutionEnvironment
-
-def createLocalEnvironment(parallelism: Int =  Runtime.getRuntime.availableProcessors())
-
-def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
-def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)
-{% endhighlight %}
-
-Typically, you only need to use `getExecutionEnvironment`, since this
-will do the right thing depending on the context: if you are executing
-your program inside an IDE or as a regular Java program it will create
-a local environment that will execute your program on your local machine. If
-you created a JAR file from your program, and invoke it through the [command line](cli.html)
-or the [web interface](web_client.html),
-the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will
return
-an execution environment for executing your program on a cluster.
-
-For specifying data sources the execution environment has several methods
-to read from files, sockets, and external systems using various methods. To just read
-data from a socket (useful also for debugging), you can use:
-
-{% highlight scala %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
-
-DataStream<String> lines = env.socketTextStream("localhost", 9999)
-{% endhighlight %}
-
-This will give you a DataStream on which you can then apply transformations. For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataStream you can apply transformations to create a new
-DataStream which you can then write to a file, transform again,
-combine with other DataStreams, or push to an external system.
-You apply transformations by calling
-methods on DataStream with your own custom transformation function. For example,
-a map transformation looks like this:
-
-{% highlight scala %}
-val input: DataStream[String] = ...
-
-val mapped = input.map { x => x.toInt }
-{% endhighlight %}
-
-This will create a new DataStream by converting every String in the original
-set to an Integer. For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataStream containing your final results, you can push the result
-to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file,
-or print it.
-
-{% highlight scala %}
-writeAsText(path: String, ...)
-writeAsCsv(path: String, ...)
-writeToSocket(hostname: String, port: Int, ...)
-
-print()
-
-addSink(...)
-{% endhighlight %}
-
-Once you specified the complete program you need to **trigger the program execution** by
-calling `execute` on `StreamExecutionEnvironment`. This will either execute on
-the local machine or submit the program for execution on a cluster, depending on the chosen
execution environment.
-
-{% highlight scala %}
-env.execute()
-{% endhighlight %}
-
-</div>
-</div>
-
-{% top %}
-
-DataStream Abstraction
-----------------------
-
-A `DataStream` is a possibly unbounded immutable collection of data items of a the same type.
-
-Transformations may return different subtypes of `DataStream` allowing specialized transformations.
-For example the `keyBy(…)` method returns a `KeyedDataStream` which is a stream of data
that
-is logically partitioned by a certain key, and can be further windowed.
-
-{% top %}
-
-Lazy Evaluation
----------------
-
-All Flink DataStream programs are executed lazily: When the program's main method is executed,
the data loading
-and transformations do not happen directly. Rather, each operation is created and added to
the
-program's plan. The operations are actually executed when the execution is explicitly triggered
by
-an `execute()` call on the `StreamExecutionEnvironment` object. Whether the program is executed
locally
-or on a cluster depends on the type of `StreamExecutionEnvironment`.
-
-The lazy evaluation lets you construct sophisticated programs that Flink executes as one
-holistically planned unit.
-
-{% top %}
-
-
-Transformations
----------------
+DataStream Transformations
+--------------------------
 
 Data transformations transform one or more DataStreams into a new DataStream. Programs can
combine
 multiple transformations into sophisticated topologies.
@@ -836,7 +545,7 @@ DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
           <td>
             <p>
                 Extracts timestamps from records in order to work with windows
-                that use event time semantics. See <a href="#working-with-time">working
with time</a>.
+                that use event time semantics. See <a href="{{ site.baseurl }}/apis/streaming/time.html">working
with time</a>.
                 {% highlight java %}
 stream.assignTimestamps (new TimeStampExtractor() {...});
                 {% endhighlight %}
@@ -1158,7 +867,7 @@ iteration.closeWith(feedback);
             <p>
                 Extracts timestamps from records in order to work with windows
                 that use event time semantics.
-                See <a href="#working-with-time">working with time</a>.
+                See <a href="{{ site.baseurl }}/apis/streaming/time.html">working with
time</a>.
                 {% highlight scala %}
 stream.assignTimestamps { timestampExtractor }
                 {% endhighlight %}
@@ -1619,53 +1328,6 @@ someStream.map(...).isolateResources()
 
 {% top %}
 
-Specifying Keys
-----------------
-
-The `keyBy` transformation requires that a key is defined on
-its argument DataStream.
-
-A DataStream is keyed as
-{% highlight java %}
-DataStream<...> input = // [...]
-DataStream<...> windowed = input
-	.keyBy(/*define key here*/)
-	.window(/*define window here*/);
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data stream types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#specifying-keys)
on how to specify keys.
-Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`.
-
-
-
-Passing Functions to Flink
---------------------------
-
-Some transformations take user-defined functions as arguments.
-
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#passing-functions-to-flink).
-
-
-{% top %}
-
-
-Data Types
-----------
-
-Flink places some restrictions on the type of elements that are used in DataStreams and in
results
-of transformations. The reason for this is that the system analyzes the types to determine
-efficient execution strategies.
-
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#data-types).
-
-{% top %}
-
-
 Data Sources
 ------------
 
@@ -1767,24 +1429,6 @@ Custom:
 
 {% top %}
 
-
-Execution Configuration
-----------
-
-The `StreamExecutionEnvironment` also contains the `ExecutionConfig` which allows to set
job specific configuration values for the runtime.
-
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#execution-configuration).
-
-Parameters in the `ExecutionConfig` that pertain specifically to the DataStream API are:
-
-- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted
from a source.
-    `areTimestampsEnabled()` returns the current value.
-
-- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark
emission. You can
-    get the current value with `long getAutoWatermarkInterval()`
-
-{% top %}
-
 Data Sinks
 ----------
 
@@ -1815,7 +1459,7 @@ greater than 1, the output will also be prepended with the identifier
of the tas
 
 - `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other
systems (such as
     Apache Kafka) that are implemented as sink functions.
-    
+
 </div>
 <div data-lang="scala" markdown="1">
 
@@ -1848,139 +1492,17 @@ greater than 1, the output will also be prepended with the identifier
of the tas
 </div>
 
 Note that the `write*()` methods on `DataStream` are mainly intended for debugging purposes.
-They are not participating in Flink's checkpointing, this means these functions usually have

-at-least-once semantics. The data flushing to the target system depends on the implementation
of the 
-OutputFormat. This means that not all elements send to the OutputFormat are immediately showing
up 
+They are not participating in Flink's checkpointing, this means these functions usually have
+at-least-once semantics. The data flushing to the target system depends on the implementation
of the
+OutputFormat. This means that not all elements send to the OutputFormat are immediately showing
up
 in the target system. Also, in failure cases, those records might be lost.
 
 For reliable, exactly-once delivery of a stream into a file system, use the `flink-connector-filesystem`.
 Also, custom implementations through the `.addSink(...)` method can partiticpate in Flink's
checkpointing
 for exactly-once semantics.
 
-
-
 {% top %}
 
-Debugging
----------
-
-Before running a streaming program in a distributed cluster, it is a good
-idea to make sure that the implemented algorithm works as desired. Hence, implementing data
analysis
-programs is usually an incremental process of checking results, debugging, and improving.
-
-Flink provides features to significantly ease the development process of data analysis
-programs by supporting local debugging from within an IDE, injection of test data, and collection
of
-result data. This section give some hints how to ease the development of Flink programs.
-
-### Local Execution Environment
-
-A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created
in. If you
-start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily
debug your
-program.
-
-A LocalEnvironment is created and used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
-DataStream<String> lines = env.addSource(/* some source */);
-// build your program
-
-env.execute();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-val env = StreamExecutionEnvironment.createLocalEnvironment()
-
-val lines = env.addSource(/* some source */)
-// build your program
-
-env.execute()
-{% endhighlight %}
-</div>
-</div>
-
-### Collection Data Sources
-
-Flink provides special data sources which are backed
-by Java collections to ease testing. Once a program has been tested, the sources and sinks
can be
-easily replaced by sources and sinks that read from / write to external systems.
-
-Collection data sources can be used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
-// Create a DataStream from a list of elements
-DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
-
-// Create a DataStream from any Java collection
-List<Tuple2<String, Integer>> data = ...
-DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
-
-// Create a DataStream from an Iterator
-Iterator<Long> longIt = ...
-DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.createLocalEnvironment()
-
-// Create a DataStream from a list of elements
-val myInts = env.fromElements(1, 2, 3, 4, 5)
-
-// Create a DataStream from any Collection
-val data: Seq[(String, Int)] = ...
-val myTuples = env.fromCollection(data)
-
-// Create a DataStream from an Iterator
-val longIt: Iterator[Long] = ...
-val myLongs = env.fromCollection(longIt)
-{% endhighlight %}
-</div>
-</div>
-
-**Note:** Currently, the collection data source requires that data types and iterators implement
-`Serializable`. Furthermore, collection data sources can not be executed in parallel (
-parallelism = 1).
-
-### Iterator Data Sink
-
-Flink also provides a sink to collect DataStream results for testing and debugging purposes.
It can be used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
-
-DataStream<Tuple2<String, Integer>> myResult = ...
-Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
-import scala.collection.JavaConverters.asScalaIteratorConverter
-
-val myResult: DataStream[(String, Int)] = ...
-val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala
-{% endhighlight %}
-</div>
-</div>
-
-
-{% top %}
-
-
 Windows
 -------
 
@@ -2014,7 +1536,7 @@ to defining your own windows.
           <p>
           Defines a window of 5 seconds, that "tumbles". This means that elements are
           grouped according to their timestamp in groups of 5 second duration, and every
element belongs to exactly one window.
-	  The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>).
+	  The notion of time is specified by the selected TimeCharacteristic (see <a href="{{
site.baseurl }}/apis/streaming/time.html">time</a>).
     {% highlight java %}
 keyedStream.timeWindow(Time.seconds(5));
     {% endhighlight %}
@@ -2028,7 +1550,7 @@ keyedStream.timeWindow(Time.seconds(5));
              Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements
are
              grouped according to their timestamp in groups of 5 second duration, and elements
can belong to more than
              one window (since windows overlap by at most 4 seconds)
-             The notion of time is specified by the selected TimeCharacteristic (see <a
href="#working-with-time">time</a>).
+             The notion of time is specified by the selected TimeCharacteristic (see <a
href="{{ site.baseurl }}/apis/streaming/time.html">time</a>).
       {% highlight java %}
 keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
       {% endhighlight %}
@@ -2084,7 +1606,7 @@ keyedStream.countWindow(1000, 100)
           <p>
           Defines a window of 5 seconds, that "tumbles". This means that elements are
           grouped according to their timestamp in groups of 5 second duration, and every
element belongs to exactly one window.
-          The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>).
+          The notion of time is specified by the selected TimeCharacteristic (see <a href="{{
site.baseurl }}/apis/streaming/time.html">time</a>).
     {% highlight scala %}
 keyedStream.timeWindow(Time.seconds(5))
     {% endhighlight %}
@@ -2098,7 +1620,7 @@ keyedStream.timeWindow(Time.seconds(5))
              Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements
are
              grouped according to their timestamp in groups of 5 second duration, and elements
can belong to more than
              one window (since windows overlap by at most 4 seconds)
-             The notion of time is specified by the selected TimeCharacteristic (see <a
href="#working-with-time">time</a>).
+             The notion of time is specified by the selected TimeCharacteristic (see <a
href="{{ site.baseurl }}/apis/streaming/time.html">time</a>).
       {% highlight scala %}
 keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
       {% endhighlight %}
@@ -2953,54 +2475,6 @@ nonKeyedStream.countWindowAll(1000, 100)
 
 {% top %}
 
-Execution Parameters
---------------------
-
-### Fault Tolerance
-
-The [Fault Tolerance Documentation](fault_tolerance.html) describes the options and parameters
to enable and configure Flink's checkpointing mechanism.
-
-### Parallelism
-
-You can control the number of parallel instances created for each operator by
-calling the `operator.setParallelism(int)` method.
-
-### Controlling Latency
-
-By default, elements are not transferred on the network one-by-one (which would cause unnecessary
network traffic)
-but are buffered. The size of the buffers (which are actually transferred between machines)
can be set in the Flink config files.
-While this method is good for optimizing throughput, it can cause latency issues when the
incoming stream is not fast enough.
-To control throughput and latency, you can use `env.setBufferTimeout(timeoutMillis)` on the
execution environment
-(or on individual operators) to set a maximum wait time for the buffers to fill up. After
this time, the
-buffers are sent automatically even if they are not full. The default value for this timeout
is 100 ms.
-
-Usage:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-env.setBufferTimeout(timeoutMillis);
-
-env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
-env.setBufferTimeout(timeoutMillis)
-
-env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
-{% endhighlight %}
-</div>
-</div>
-
-To maximize throughput, set `setBufferTimeout(-1)` which will remove the timeout and buffers
will only be
-flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for
example 5 or 10 ms).
-A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.
-
-{% top %}
-
 Working with State
 ------------------
 
@@ -3290,23 +2764,176 @@ val iteratedStream = someIntegers.iterate(
 
 {% top %}
 
-Program Packaging & Distributed Execution
------------------------------------------
+Execution Parameters
+--------------------
+
+The `StreamExecutionEnvironment` contains the `ExecutionConfig` which allows to set job specific
configuration values for the runtime.
+
+Please refer to [execution configuration]({{ site.baseurl }}/apis/common/index.html#execution-configuration)
+for an explanation of most parameters. These parameters pertain specifically to the DataStream
API:
+
+- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted
from a source.
+    `areTimestampsEnabled()` returns the current value.
 
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#program-packaging-and-distributed-execution).
+- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark
emission. You can
+    get the current value with `long getAutoWatermarkInterval()`
 
 {% top %}
 
-Parallel Execution
-------------------
+### Fault Tolerance
+
+The [Fault Tolerance Documentation](fault_tolerance.html) describes the options and parameters
to enable and configure Flink's checkpointing mechanism.
+
+### Controlling Latency
 
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#parallel-execution).
+By default, elements are not transferred on the network one-by-one (which would cause unnecessary
network traffic)
+but are buffered. The size of the buffers (which are actually transferred between machines)
can be set in the Flink config files.
+While this method is good for optimizing throughput, it can cause latency issues when the
incoming stream is not fast enough.
+To control throughput and latency, you can use `env.setBufferTimeout(timeoutMillis)` on the
execution environment
+(or on individual operators) to set a maximum wait time for the buffers to fill up. After
this time, the
+buffers are sent automatically even if they are not full. The default value for this timeout
is 100 ms.
+
+Usage:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+env.setBufferTimeout(timeoutMillis);
+
+env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
+env.setBufferTimeout(timeoutMillis)
+
+env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
+{% endhighlight %}
+</div>
+</div>
+
+To maximize throughput, set `setBufferTimeout(-1)` which will remove the timeout and buffers
will only be
+flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for
example 5 or 10 ms).
+A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.
 
 {% top %}
 
-Execution Plans
----------------
+Debugging
+---------
+
+Before running a streaming program in a distributed cluster, it is a good
+idea to make sure that the implemented algorithm works as desired. Hence, implementing data
analysis
+programs is usually an incremental process of checking results, debugging, and improving.
+
+Flink provides features to significantly ease the development process of data analysis
+programs by supporting local debugging from within an IDE, injection of test data, and collection
of
+result data. This section give some hints how to ease the development of Flink programs.
+
+### Local Execution Environment
+
+A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created
in. If you
+start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily
debug your
+program.
+
+A LocalEnvironment is created and used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+DataStream<String> lines = env.addSource(/* some source */);
+// build your program
+
+env.execute();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.createLocalEnvironment()
 
-See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#execution-plans).
+val lines = env.addSource(/* some source */)
+// build your program
+
+env.execute()
+{% endhighlight %}
+</div>
+</div>
+
+### Collection Data Sources
+
+Flink provides special data sources which are backed
+by Java collections to ease testing. Once a program has been tested, the sources and sinks
can be
+easily replaced by sources and sinks that read from / write to external systems.
+
+Collection data sources can be used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+// Create a DataStream from a list of elements
+DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
+
+// Create a DataStream from any Java collection
+List<Tuple2<String, Integer>> data = ...
+DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
+
+// Create a DataStream from an Iterator
+Iterator<Long> longIt = ...
+DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.createLocalEnvironment()
+
+// Create a DataStream from a list of elements
+val myInts = env.fromElements(1, 2, 3, 4, 5)
+
+// Create a DataStream from any Collection
+val data: Seq[(String, Int)] = ...
+val myTuples = env.fromCollection(data)
+
+// Create a DataStream from an Iterator
+val longIt: Iterator[Long] = ...
+val myLongs = env.fromCollection(longIt)
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** Currently, the collection data source requires that data types and iterators implement
+`Serializable`. Furthermore, collection data sources can not be executed in parallel (
+parallelism = 1).
+
+### Iterator Data Sink
+
+Flink also provides a sink to collect DataStream results for testing and debugging purposes.
It can be used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.contrib.streaming.DataStreamUtils
+
+DataStream<Tuple2<String, Integer>> myResult = ...
+Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+import org.apache.flink.contrib.streaming.DataStreamUtils
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+val myResult: DataStream[(String, Int)] = ...
+val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}


Mime
View raw message