flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/7] flink git commit: [FLINK-3402] Refactor Common Parts of Stream/Batch Documentation
Date Wed, 17 Feb 2016 09:59:42 GMT
[FLINK-3402] Refactor Common Parts of Stream/Batch Documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be4601ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be4601ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be4601ea

Branch: refs/heads/master
Commit: be4601ea5746277e8baf895ad057946d6fc88138
Parents: 134b5c2
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Feb 16 14:19:51 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Feb 17 10:57:44 2016 +0100

----------------------------------------------------------------------
 docs/apis/batch/fig/plan_visualizer.png  |  Bin 145778 -> 0 bytes
 docs/apis/batch/index.md                 | 1869 ++++---------------------
 docs/apis/common/fig/plan_visualizer.png |  Bin 0 -> 145778 bytes
 docs/apis/common/index.md                | 1356 ++++++++++++++++++
 docs/apis/streaming/index.md             |  737 +++-------
 5 files changed, 1777 insertions(+), 2185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be4601ea/docs/apis/batch/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/plan_visualizer.png b/docs/apis/batch/fig/plan_visualizer.png
deleted file mode 100644
index 85b8c55..0000000
Binary files a/docs/apis/batch/fig/plan_visualizer.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/be4601ea/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
index 2818490..133996f 100644
--- a/docs/apis/batch/index.md
+++ b/docs/apis/batch/index.md
@@ -3,7 +3,7 @@ title: "Flink DataSet API Programming Guide"
 
 # Top-level navigation
 top-nav-group: apis
-top-nav-pos: 2
+top-nav-pos: 3
 top-nav-title: <strong>Batch Guide</strong> (DataSet API)
 
 # Sub-level navigation
@@ -39,9 +39,13 @@ example write the data to (distributed) files, or to standard output (for exampl
 terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
 The execution can happen in a local JVM, or on clusters of many machines.
 
+Please see [basic concepts]({{ site.baseurl }}/apis/common/index.html) for an introduction
+to the basic concepts of the Flink API.
+
 In order to create your own Flink DataSet program, we encourage you to start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations). The remaining sections act as references for additional
+[anatomy of a Flink Program]({{ site.baseurl }}/apis/common/index.html#anatomy-of-a-flink-program)
+and gradually add your own
+[transformations](#dataset-transformations). The remaining sections act as references for additional
 operations and advanced features.
 
 * This will be replaced by the TOC
@@ -115,377 +119,8 @@ object WordCount {
 
 {% top %}
 
-
-Linking with Flink
-------------------
-
-To write programs with Flink, you need to include the Flink library corresponding to
-your programming language in your project.
-
-The simplest way to do this is to use one of the quickstart scripts: either for
-[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They
-create a blank project from a template (a Maven Archetype), which sets up everything for you. To
-manually create the project, you can use the archetype and create a project by calling:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-quickstart-java \
-    -DarchetypeVersion={{site.version }}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-quickstart-scala \
-    -DarchetypeVersion={{site.version }}
-{% endhighlight %}
-</div>
-</div>
-
-The archetypes are working for stable releases and preview versions (`-SNAPSHOT`).
-
-If you want to add Flink to an existing Maven project, add the following entry to your
-*dependencies* section in the *pom.xml* file of your project:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-java</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-scala{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-**Important:** When working with the Scala API you must have one of these two imports:
-{% highlight scala %}
-import org.apache.flink.api.scala._
-{% endhighlight %}
-
-or
-
-{% highlight scala %}
-import org.apache.flink.api.scala.createTypeInformation
-{% endhighlight %}
-
-The reason is that Flink analyzes the types that are used in a program and generates serializers
-and comparaters for them. By having either of those imports you enable an implicit conversion
-that creates the type information for Flink operations.
-</div>
-</div>
-
-#### Scala Dependency Versions
-
-Because Scala 2.10 binary is not compatible with Scala 2.11 binary, we provide multiple artifacts
-to support both Scala versions.
-
-Starting from the 0.10 line, we cross-build all Flink modules for both 2.10 and 2.11. If you want
-to run your program on Flink with Scala 2.11, you need to add a `_2.11` suffix to the `artifactId`
-values of the Flink modules in your dependencies section.
-
-If you are looking for building Flink with Scala 2.11, please check
-[build guide]({{ site.baseurl }}/setup/building.html#scala-versions).
-
-#### Hadoop Dependency Versions
-
-If you are using Flink together with Hadoop, the version of the dependency may vary depending on the
-version of Hadoop (or more specifically, HDFS) that you want to use Flink with. Please refer to the
-[downloads page](http://flink.apache.org/downloads.html) for a list of available versions, and instructions
-on how to link with custom versions of Hadoop.
-
-In order to link against the latest SNAPSHOT versions of the code, please follow
-[this guide](http://flink.apache.org/how-to-contribute.html#snapshots-nightly-builds).
-
-The *flink-clients* dependency is only necessary to invoke the Flink program locally (for example to
-run it standalone for testing and debugging).  If you intend to only export the program as a JAR
-file and [run it on a cluster]({{ site.baseurl }}/apis/cluster_execution.html), you can skip that dependency.
-
-{% top %}
-
-Program Skeleton
-----------------
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-As we already saw in the example, Flink DataSet programs look like regular Java
-programs with a `main()` method. Each program consists of the same basic parts:
-
-1. Obtain an `ExecutionEnvironment`,
-2. Load/create the initial data,
-3. Specify transformations on this data,
-4. Specify where to put the results of your computations,
-5. Trigger the program execution
-
-We will now give an overview of each of those steps, please refer to the respective sections for
-more details. Note that all core classes of the Java API are found in the package {% gh_link /flink-java/src/main/java/org/apache/flink/api/java "org.apache.flink.api.java" %}.
-
-The `ExecutionEnvironment` is the basis for all Flink DataSet programs. You can
-obtain one using these static methods on class `ExecutionEnvironment`:
-
-{% highlight java %}
-getExecutionEnvironment()
-
-createCollectionsEnvironment()
-
-createLocalEnvironment()
-createLocalEnvironment(int parallelism)
-createLocalEnvironment(Configuration customConfiguration)
-
-createRemoteEnvironment(String host, int port, String... jarFiles)
-createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
-{% endhighlight %}
-
-Typically, you only need to use `getExecutionEnvironment()`, since this
-will do the right thing depending on the context: if you are executing
-your program inside an IDE or as a regular Java program it will create
-a local environment that will execute your program on your local machine. If
-you created a JAR file from your program, and invoke it through the [command line]({{ site.baseurl }}/apis/cli.html),
-the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return
-an execution environment for executing your program on a cluster.
-
-For specifying data sources the execution environment has several methods
-to read from files using various methods: you can just read them line by line,
-as CSV files, or using completely custom data input formats. To just read
-a text file as a sequence of lines, you can use:
-
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-DataSet<String> text = env.readTextFile("file:///path/to/file");
-{% endhighlight %}
-
-This will give you a DataSet on which you can then apply transformations. For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataSet you can apply transformations to create a new
-DataSet which you can then write to a file, transform again, or
-combine with other DataSets. You apply transformations by calling
-methods on DataSet with your own custom transformation functions. For example,
-a map transformation looks like this:
-
-{% highlight java %}
-DataSet<String> input = ...;
-
-DataSet<Integer> tokenized = input.map(new MapFunction<String, Integer>() {
-    @Override
-    public Integer map(String value) {
-        return Integer.parseInt(value);
-    }
-});
-{% endhighlight %}
-
-This will create a new DataSet by converting every String in the original
-set to an Integer. For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataSet containing your final results, you can either write the result
-to a file system (HDFS or local) or print it.
-
-{% highlight java %}
-writeAsText(String path)
-writeAsCsv(String path)
-write(FileOutputFormat<T> outputFormat, String filePath)
-
-print()
-printOnTaskManager()
-
-collect()
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-As we already saw in the example, Flink programs look like regular Scala
-programs with a `main()` method. Each program consists of the same basic parts:
-
-1. Obtain an `ExecutionEnvironment`,
-2. Load/create the initial data,
-3. Specify transformations on this data,
-4. Specify where to put the results of your computations,
-5. Trigger the program execution
-
-We will now give an overview of each of those steps, please refer to the respective sections for
-more details. Note that all core classes of the Scala API are found in the package
-{% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala "org.apache.flink.api.scala" %}.
-
-
-The `ExecutionEnvironment` is the basis for all Flink programs. You can
-obtain one using these static methods on class `ExecutionEnvironment`:
-
-{% highlight scala %}
-def getExecutionEnvironment
-
-def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors())
-def createLocalEnvironment(customConfiguration: Configuration)
-
-def createCollectionsEnvironment
-
-def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
-def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)
-{% endhighlight %}
-
-Typically, you only need to use `getExecutionEnvironment()`, since this
-will do the right thing depending on the context: if you are executing
-your program inside an IDE or as a regular Scala program it will create
-a local environment that will execute your program on your local machine. If
-you created a JAR file from you program, and invoke it through the [command line](cli.html)
-or the [web interface](web_client.html),
-the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return
-an execution environment for executing your program on a cluster.
-
-For specifying data sources the execution environment has several methods
-to read from files using various methods: you can just read them line by line,
-as CSV files, or using completely custom data input formats. To just read
-a text file as a sequence of lines, you can use:
-
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-
-val text = env.readTextFile("file:///path/to/file")
-{% endhighlight %}
-
-This will give you a DataSet on which you can then apply transformations. For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataSet you can apply transformations to create a new
-DataSet which you can then write to a file, transform again, or
-combine with other DataSets. You apply transformations by calling
-methods on DataSet with your own custom transformation function. For example,
-a map transformation looks like this:
-
-{% highlight scala %}
-val input: DataSet[String] = ...
-
-val mapped = input.map { x => x.toInt }
-{% endhighlight %}
-
-This will create a new DataSet by converting every String in the original
-set to an Integer. For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataSet containing your final results, you can either write the result
-to a file system (HDFS or local) or print it.
-
-{% highlight scala %}
-def writeAsText(path: String, writeMode: WriteMode = WriteMode.NO_OVERWRITE)
-def writeAsCsv(
-    filePath: String,
-    rowDelimiter: String = "\n",
-    fieldDelimiter: String = ',',
-    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
-def write(outputFormat: FileOutputFormat[T],
-    path: String,
-    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
-
-def printOnTaskManager()
-
-def print()
-
-def collect()
-{% endhighlight %}
-
-</div>
-</div>
-
-
-The first two methods (`writeAsText()` and `writeAsCsv()`) do as the name suggests, the third one
-can be used to specify a custom data output format. Please refer to [Data Sinks](#data-sinks) for
-more information on writing to files and also about custom data output formats.
-
-The `print()` method is useful for developing/debugging. It will output the contents of the DataSet
-to standard output (on the JVM starting the Flink execution). **NOTE** The behavior of the `print()`
-method changed with Flink 0.9.x. Before it was printing to the log file of the workers, now its
-sending the DataSet results to the client and printing the results there.
-
-`collect()` retrieve the DataSet from the cluster to the local JVM. The `collect()` method
-will return a `List` containing the elements.
-
-Both `print()` and `collect()` will trigger the execution of the program. You don't need to further call `execute()`.
-
-
-**NOTE** `print()` and `collect()` retrieve the data from the cluster to the client. Currently,
-the data sizes you can retrieve with `collect()` are limited due to our RPC system. It is not advised
-to collect DataSets larger than 10MBs.
-
-There is also a `printOnTaskManager()` method which will print the DataSet contents on the TaskManager
-(so you have to get them from the log file). The `printOnTaskManager()` method will not trigger a
-program execution.
-
-Once you specified the complete program you need to **trigger the program execution**. You can call
-`execute()` directly on the `ExecutionEnviroment` or you implicitly trigger the execution with
-`collect()` or `print()`.
-Depending on the type of the `ExecutionEnvironment` the execution will be triggered on your local
-machine or submit your program for execution on a cluster.
-
-Note that you can not call both `print()` (or `collect()`) and `execute()` at the end of program.
-
-The `execute()` method is returning the `JobExecutionResult`, including execution times and
-accumulator results. `print()` and `collect()` are not returning the result, but it can be
-accessed from the `getLastJobExecutionResult()` method.
-
-
-{% top %}
-
-
-DataSet abstraction
----------------
-
-A `DataSet` is an abstract representation of a finite immutable collection of data of the same type that may contain duplicates.
-
-Note that Flink is not always physically creating (materializing) each DataSet at runtime. This
-depends on the used runtime, the configuration and optimizer decisions. DataSets may be "streamed through"
-operations during execution, as under the hood Flink uses a streaming data processing engine.
-
-Some DataSets are materialized automatically to avoid distributed deadlocks (at points where the data flow graph branches
-out and joins again later) or if the execution mode has explicitly been set to blocking execution.
-
-{% top %}
-
-
-Lazy Evaluation
----------------
-
-All Flink DataSet programs are executed lazily: When the program's main method is executed, the data loading
-and transformations do not happen directly. Rather, each operation is created and added to the
-program's plan. The operations are actually executed when the execution is explicitly triggered by
-an `execute()` call on the ExecutionEnvironment object. Also, `collect()` and `print()` will trigger
-the job execution. Whether the program is executed locally or on a cluster depends
-on the environment of the program.
-
-The lazy evaluation lets you construct sophisticated programs that Flink executes as one
-holistically planned unit.
-
-{% top %}
-
-
-Transformations
----------------
+DataSet Transformations
+-----------------------
 
 Data transformations transform one or more DataSets into a new DataSet. Programs can combine
 multiple transformations into sophisticated assemblies.
@@ -1105,732 +740,104 @@ possible for [Data Sources](#data-sources) and [Data Sinks](#data-sinks).
 
 {% top %}
 
-
-Specifying Keys
--------------
-
-
-
-Some transformations (join, coGroup) require that a key is defined on
-its argument DataSets, and other transformations (Reduce, GroupReduce,
-Aggregate) allow that the DataSet is grouped on a key before they are
-applied.
-
-A DataSet is grouped as
-{% highlight java %}
-DataSet<...> input = // [...]
-DataSet<...> reduced = input
-	.groupBy(/*define key here*/)
-	.reduceGroup(/*do something*/);
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data set types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-### Define keys for Tuples
-{:.no_toc}
-
-The simplest case is grouping a data set of Tuples on one or more
-fields of the Tuple:
+Data Sources
+------------
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-{% highlight java %}
-DataSet<Tuple3<Integer,String,Long>> input = // [...]
-DataSet<Tuple3<Integer,String,Long> grouped = input
-	.groupBy(0)
-	.reduceGroup(/*do something*/);
-{% endhighlight %}
-
-The data set is grouped on the first field of the tuples (the one of
-Integer type). The GroupReduce function will thus receive groups of tuples with
-the same value in the first field.
-
-{% highlight java %}
-DataSet<Tuple3<Integer,String,Long>> input = // [...]
-DataSet<Tuple3<Integer,String,Long> grouped = input
-	.groupBy(0,1)
-	.reduce(/*do something*/);
-{% endhighlight %}
-
-The data set is grouped on the composite key consisting of the first and the
-second field. Therefore, the GroupReduce function will receive groups
-with the same value for both fields.
-
-A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
 
-{% highlight java %}
-DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
-{% endhighlight %}
-
-Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use field expression keys which are explained below.
+Data sources create the initial data sets, such as from files or from Java collections. The general
+mechanism of creating data sets is abstracted behind an
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}.
+Flink comes
+with several built-in formats to create data sets from common file formats. Many of them have
+shortcut methods on the *ExecutionEnvironment*.
 
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataSet[(Int, String, Long)] = // [...]
-val grouped = input
-  .groupBy(0)
-  .reduceGroup(/*do something*/)
-{% endhighlight %}
+File-based:
 
-The data set is grouped on the first field of the tuples (the one of
-Integer type). The GroupReduce function will thus receive groups of tuples with
-the same value in the first field.
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings.
 
-{% highlight scala %}
-val input: DataSet[(Int, String, Long)] = // [...]
-val grouped = input
-  .groupBy(0,1)
-  .reduce(/*do something*/)
-{% endhighlight %}
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as
+  StringValues. StringValues are mutable strings.
 
-The data set is grouped on the composite key consisting of the first and the
-second field. Therefore, the GroupReduce function will receive groups
-with the same value for both fields.
+- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
+  Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field
+  types.
 
-A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
+  delimited primitive data types such as `String` or `Integer`.
 
-{% highlight scala %}
-val ds: DataSet[((Int, Float), String, Long)]
-{% endhighlight %}
+- `readFileOfPrimitives(path, delimiter, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
+   delimited primitive data types such as `String` or `Integer` using the given delimiter.
 
-Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Int and
-Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use field expression keys which are explained below.
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified
+   path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
 
-</div>
-</div>
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
+   type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
 
-### Define keys using Field Expressions
-{:.no_toc}
 
-Starting from release 0.7-incubating, you can use String-based field expressions to reference nested fields and define keys for grouping, sorting, joining, or coGrouping. In addition, field expressions can be used to define [semantic function annotations](#semantic-annotations).
+Collection-based:
 
-Field expressions make it very easy to select fields in (nested) composite types such as [Tuple](#tuples-and-case-classes) and [POJO](#pojos) types.
+- `fromCollection(Collection)` - Creates a data set from the Java Java.util.Collection. All elements
+  in the collection must be of the same type.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+- `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The class specifies the
+  data type of the elements returned by the iterator.
 
-In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `groupBy()` function.
-{% highlight java %}
-// some ordinary POJO (Plain old Java Object)
-public class WC {
-  public String word;
-  public int count;
-}
-DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);
-{% endhighlight %}
+- `fromElements(T ...)` - Creates a data set from the given sequence of objects. All objects must be
+  of the same type.
 
-**Field Expression Syntax**:
+- `fromParallelCollection(SplittableIterator, Class)` - Creates a data set from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the iterator.
 
-- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type.
+- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
+  parallel.
 
-- Select Tuple fields by their field name or 0-offset field index. For example `"f0"` and `"5"` refer to the first and sixth field of a Java Tuple type, respectively.
+Generic:
 
-- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"f1.user.zip"` or `"user.f3.1.zip"`.
+- `readFile(inputFormat, path)` / `FileInputFormat` - Accepts a file input format.
 
-- You can select the full type using the `"*"` wildcard expressions. This does also work for types which are not Tuple or POJO types.
+- `createInput(inputFormat)` / `InputFormat` - Accepts a generic input format.
 
-**Field Expression Example**:
+**Examples**
 
 {% highlight java %}
-public static class WC {
-  public ComplexNestedClass complex; //nested POJO
-  private int count;
-  // getter / setter for private field (count)
-  public int getCount() {
-    return count;
-  }
-  public void setCount(int c) {
-    this.count = c;
-  }
-}
-public static class ComplexNestedClass {
-  public Integer someNumber;
-  public float someFloat;
-  public Tuple3<Long, Long, String> word;
-  public IntWritable hadoopCitizen;
-}
-{% endhighlight %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-These are valid field expressions for the example code above:
+// read text file from local files system
+DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
 
-- `"count"`: The count field in the `WC` class.
+// read text file from a HDFS running at nnHost:nnPort
+DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
 
-- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
+// read a CSV file with three fields
+DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+	                       .types(Integer.class, String.class, Double.class);
 
-- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`.
+// read a CSV file with five fields, taking only two of them
+DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+                               .includeFields("10010")  // take the first and the fourth field
+	                       .types(String.class, Double.class);
 
-- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
+DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+                         .pojoType(Person.class, "name", "age", "zipcode");
 
-</div>
-<div data-lang="scala" markdown="1">
 
-In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `groupBy()` function.
-{% highlight java %}
-// some ordinary POJO (Plain old Java Object)
-class WC(var word: String, var count: Int) {
-  def this() { this("", 0L) }
-}
-val words: DataSet[WC] = // [...]
-val wordCounts = words.groupBy("word").reduce(/*do something*/)
+// read a file from the specified path of type TextInputFormat
+DataSet<Tuple2<LongWritable, Text>> tuples =
+ env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
 
-// or, as a case class, which is less typing
-case class WC(word: String, count: Int)
-val words: DataSet[WC] = // [...]
-val wordCounts = words.groupBy("word").reduce(/*do something*/)
-{% endhighlight %}
+// read a file from the specified path of type SequenceFileInputFormat
+DataSet<Tuple2<IntWritable, Text>> tuples =
+ env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
 
-**Field Expression Syntax**:
+// creates a set from some given elements
+DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
 
-- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type.
-
-- Select Tuple fields by their 1-offset field name or 0-offset field index. For example `"_1"` and `"5"` refer to the first and sixth field of a Scala Tuple type, respectively.
-
-- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"_2.user.zip"` or `"user._4.1.zip"`.
-
-- You can select the full type using the `"_"` wildcard expressions. This does also work for types which are not Tuple or POJO types.
-
-**Field Expression Example**:
-
-{% highlight scala %}
-class WC(var complex: ComplexNestedClass, var count: Int) {
-  def this() { this(null, 0) }
-}
-
-class ComplexNestedClass(
-    var someNumber: Int,
-    someFloat: Float,
-    word: (Long, Long, String),
-    hadoopCitizen: IntWritable) {
-  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
-}
-{% endhighlight %}
-
-These are valid field expressions for the example code above:
-
-- `"count"`: The count field in the `WC` class.
-
-- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
-
-- `"complex.word._3"`: Selects the last field of the nested `Tuple3`.
-
-- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
-
-</div>
-</div>
-
-### Define keys using Key Selector Functions
-{:.no_toc}
-
-An additional way to define keys are "key selector" functions. A key selector function
-takes a single dataset element as input and returns the key for the element. The key can be of any type and be derived from arbitrary computations.
-
-The following example shows a key selector function that simply returns the field of an object:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// some ordinary POJO
-public class WC {public String word; public int count;}
-DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words
-                         .groupBy(
-                           new KeySelector<WC, String>() {
-                             public String getKey(WC wc) { return wc.word; }
-                           })
-                         .reduce(/*do something*/);
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// some ordinary case class
-case class WC(word: String, count: Int)
-val words: DataSet[WC] = // [...]
-val wordCounts = words
-  .groupBy( _.word ).reduce(/*do something*/)
-{% endhighlight %}
-</div>
-</div>
-
-
-{% top %}
-
-
-Passing Functions to Flink
---------------------------
-
-Operations require user-defined functions. This section lists several ways for doing this.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-#### Implementing an interface
-
-The most basic way is to implement one of the provided interfaces:
-
-{% highlight java %}
-class MyMapFunction implements MapFunction<String, Integer> {
-  public Integer map(String value) { return Integer.parseInt(value); }
-});
-data.map (new MyMapFunction());
-{% endhighlight %}
-
-#### Anonymous classes
-
-You can pass a function as an anonymous class:
-{% highlight java %}
-data.map(new MapFunction<String, Integer> () {
-  public Integer map(String value) { return Integer.parseInt(value); }
-});
-{% endhighlight %}
-
-#### Java 8 Lambdas
-
-Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide]({{ site.baseurl }}/apis/java8.html).
-
-{% highlight java %}
-DataSet<String> data = // [...]
-data.filter(s -> s.startsWith("http://"));
-{% endhighlight %}
-
-{% highlight java %}
-DataSet<Integer> data = // [...]
-data.reduce((i1,i2) -> i1 + i2);
-{% endhighlight %}
-
-#### Rich functions
-
-All transformations that take as argument a user-defined function can
-instead take as argument a *rich* function. For example, instead of
-
-{% highlight java %}
-class MyMapFunction implements MapFunction<String, Integer> {
-  public Integer map(String value) { return Integer.parseInt(value); }
-});
-{% endhighlight %}
-
-you can write
-
-{% highlight java %}
-class MyMapFunction extends RichMapFunction<String, Integer> {
-  public Integer map(String value) { return Integer.parseInt(value); }
-});
-{% endhighlight %}
-
-and pass the function as usual to a `map` transformation:
-
-{% highlight java %}
-data.map(new MyMapFunction());
-{% endhighlight %}
-
-Rich functions can also be defined as an anonymous class:
-{% highlight java %}
-data.map (new RichMapFunction<String, Integer>() {
-  public Integer map(String value) { return Integer.parseInt(value); }
-});
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-
-#### Lambda Functions
-
-As already seen in previous examples all operations accept lambda functions for describing
-the operation:
-{% highlight scala %}
-val data: DataSet[String] = // [...]
-data.filter { _.startsWith("http://") }
-{% endhighlight %}
-
-{% highlight scala %}
-val data: DataSet[Int] = // [...]
-data.reduce { (i1,i2) => i1 + i2 }
-// or
-data.reduce { _ + _ }
-{% endhighlight %}
-
-#### Rich functions
-
-All transformations that take as argument a lambda function can
-instead take as argument a *rich* function. For example, instead of
-
-{% highlight scala %}
-data.map { x => x.toInt }
-{% endhighlight %}
-
-you can write
-
-{% highlight scala %}
-class MyMapFunction extends RichMapFunction[String, Int] {
-  def map(in: String):Int = { in.toInt }
-})
-{% endhighlight %}
-
-and pass the function to a `map` transformation:
-
-{% highlight scala %}
-data.map(new MyMapFunction())
-{% endhighlight %}
-
-Rich functions can also be defined as an anonymous class:
-{% highlight scala %}
-data.map (new RichMapFunction[String, Int] {
-  def map(in: String):Int = { in.toInt }
-})
-{% endhighlight %}
-</div>
-
-</div>
-
-Rich functions provide, in addition to the user-defined function (map,
-reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
-`setRuntimeContext`. These are useful for parameterizing the function
-(see [Passing Parameters to Functions](#passing-parameters-to-functions)),
-creating and finalizing local state, accessing broadcast variables (see
-[Broadcast Variables](#broadcast-variables), and for accessing runtime
-information such as accumulators and counters (see
-[Accumulators and Counters](#accumulators--counters), and information
-on iterations (see [Iterations](iterations.html)).
-
-In particular for the `reduceGroup` transformation, using a rich
-function is the only way to define an optional `combine` function. See
-the
-[transformations documentation](dataset_transformations.html)
-for a complete example.
-
-{% top %}
-
-
-Data Types
-----------
-
-Flink places some restrictions on the type of elements that are used in DataSets and in results
-of transformations. The reason for this is that the system analyzes the types to determine
-efficient execution strategies.
-
-There are six different categories of data types:
-
-1. **Java Tuples** and **Scala Case Classes**
-2. **Java POJOs**
-3. **Primitive Types**
-4. **Regular Classes**
-5. **Values**
-6. **Hadoop Writables**
-7. **Special Types**
-
-#### Tuples and Case Classes
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-Tuples are composite types that contain a fixed number of fields with various types.
-The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple
-can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a
-tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic getter method
-`tuple.getField(int position)`. The field indices start at 0. Note that this stands in contrast
-to the Scala tuples, but it is more consistent with Java's general indexing.
-
-{% highlight java %}
-DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
-    new Tuple2<String, Integer>("hello", 1),
-    new Tuple2<String, Integer>("world", 2));
-
-wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
-    @Override
-    public String map(Tuple2<String, Integer> value) throws Exception {
-        return value.f1;
-    }
-});
-{% endhighlight %}
-
-When grouping, sorting, or joining a data set of tuples, keys can be specified as field positions or field expressions. See the [key definition section](#specifying-keys) or [data transformation section](#transformations) for details.
-
-{% highlight java %}
-wordCounts
-    .groupBy(0) // also valid .groupBy("f0")
-    .reduce(new MyReduceFunction());
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as `_1` for the first field. Case class fields are accessed by their name.
-
-{% highlight scala %}
-case class WordCount(word: String, count: Int)
-val input = env.fromElements(
-    WordCount("hello", 1),
-    WordCount("world", 2)) // Case Class Data Set
-
-input.groupBy("word").reduce(...) // group by field expression "word"
-
-val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
-
-input2.groupBy(0, 1).reduce(...) // group by field positions 0 and 1
-{% endhighlight %}
-
-When grouping, sorting, or joining a data set of tuples, keys can be specified as field positions or field expressions. See the [key definition section](#specifying-keys) or [data transformation section](#transformations) for details.
-
-</div>
-</div>
-
-#### POJOs
-
-Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:
-
-- The class must be public.
-
-- It must have a public constructor without arguments (default constructor).
-
-- All fields are either public or must be accessible through getter and setter functions. For a field called `foo` the getter and setter methods must be named `getFoo()` and `setFoo()`.
-
-- The type of a field must be supported by Flink. At the moment, Flink uses [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
-
-Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
-
-The following example shows a simple POJO with two public fields.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class WordWithCount {
-
-    public String word;
-    public int count;
-
-    public WordWithCount() {}
-
-    public WordWithCount(String word, int count) {
-        this.word = word;
-        this.count = count;
-    }
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class WordWithCount(var word: String, var count: Int) {
-    def this() {
-      this(null, -1)
-    }
-}
-{% endhighlight %}
-</div>
-</div>
-
-When grouping, sorting, or joining a data set of POJO types, keys can be specified with field expressions. See the [key definition section](#specifying-keys) or [data transformation section](#transformations) for details.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-wordCounts
-    .groupBy("word")                    // group by field expression "word"
-    .reduce(new MyReduceFunction());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-wordCounts groupBy { _.word } reduce(new MyReduceFunction())
-{% endhighlight %}
-</div>
-</div>
-
-#### Primitive Types
-
-Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.
-
-#### General Class Types
-
-Flink supports most Java and Scala classes (API and custom).
-Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native
-resources. Classes that follow the Java Beans conventions work well in general.
-
-All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types.
-Flink treats these data types as black boxes and is not able to access their their content (i.e., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
-
-When grouping, sorting, or joining a data set of generic types, keys must be specified with key selector functions. See the [key definition section](#specifying-keys) or [data transformation section](#transformations) for details.
-
-
-#### Values
-
-*Value* types describe their serialization and deserialization manually. Instead of going through a
-general purpose serialization framework, they provide custom code for those operations by means of
-implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using
-a Value type is reasonable when general purpose serialization would be highly inefficient. An
-example would be a data type that implements a sparse vector of elements as an array. Knowing that
-the array is mostly zero, one can use a special encoding for the non-zero elements, while the
-general purpose serialization would simply write all array elements.
-
-The `org.apache.flinktypes.CopyableValue` interface supports manual internal cloning logic in a
-similar way.
-
-Flink comes with pre-defined Value types that correspond to basic data types. (`ByteValue`,
-`ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, `StringValue`, `CharValue`,
-`BooleanValue`). These Value types act as mutable variants of the basic data types: Their value can
-be altered, allowing programmers to reuse objects and take pressure off the garbage collector.
-
-
-#### Hadoop Writables
-
-You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic
-defined in the `write()`and `readFields()` methods will be used for serialization.
-
-#### Special Types
-
-You can use special types, including Scala's `Either`, `Option`, and `Try`.
-The Java API has its own custom implementation of `Either`.
-Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or *Right*.
-`Either` can be useful for error handling or operators that need to output two different types of records.
-
-#### Type Erasure & Type Inference
-
-*Note: This Section is only relevant for Java.*
-
-The Java compiler throws away much of the generic type information after compilation. This is
-known as *type erasure* in Java. It means that at runtime, an instance of an object does not know
-its generic type any more. For example, instances of `DataSet<String>` and `DataSet<Long>` look the
-same to the JVM.
-
-Flink requires type information at the time when it prepares the program for execution (when the
-main method of the program is called). The Flink Java API tries to reconstruct the type information
-that was thrown away in various ways and store it explicitly in the data sets and operators. You can
-retrieve the type via `DataSet.getType()`. The method returns an instance of `TypeInformation`,
-which is Flink's internal way of representing types.
-
-The type inference has its limits and needs the "cooperation" of the programmer in some cases.
-Examples for that are methods that create data sets from collections, such as
-`ExecutionEnvironment.fromCollection(),` where you can pass an argument that describes the type. But
-also generic functions like `MapFunction<I, O>` may need extra type information.
-
-The
-{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %}
-interface can be implemented by input formats and functions to tell the API
-explicitly about their return type. The *input types* that the functions are invoked with can
-usually be inferred by the result types of the previous operations.
-
-
-#### Object reuse behavior
-
-Apache Flink is trying to reduce the number of object allocations for better performance.
-
-By default, user defined functions (like `map()` or `groupReduce()`) are getting new objects on each call (or through an iterator). So it is possible to keep references to the objects inside the function (for example in a List).
-
-User defined functions are often chained, for example when two mappers with the same parallelism are defined one after another. In the chaining case, the functions in the chain are receiving the same object instances. So the the second `map()` function is receiving the objects the first `map()` is returning.
-This behavior can lead to errors when the first `map()` function keeps a list of all objects and the second mapper is modifying objects. In that case, the user has to manually create copies of the objects before putting them into the list.
-
-Also note that the system assumes that the user is not modifying the incoming objects in the `filter()` function.
-
-There is a switch at the `ExectionConfig` which allows users to enable the object reuse mode (`enableObjectReuse()`). For mutable types, Flink will reuse object instances. In practice that means that a `map()` function will always receive the same object instance (with its fields set to new values). The object reuse mode will lead to better performance because fewer objects are created, but the user has to manually take care of what they are doing with the object references.
-
-
-
-{% top %}
-
-
-Data Sources
-------------
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-Data sources create the initial data sets, such as from files or from Java collections. The general
-mechanism of creating data sets is abstracted behind an
-{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}.
-Flink comes
-with several built-in formats to create data sets from common file formats. Many of them have
-shortcut methods on the *ExecutionEnvironment*.
-
-File-based:
-
-- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings.
-
-- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as
-  StringValues. StringValues are mutable strings.
-
-- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
-  Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field
-  types.
-
-- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
-  delimited primitive data types such as `String` or `Integer`.
-
-- `readFileOfPrimitives(path, delimiter, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence)
-   delimited primitive data types such as `String` or `Integer` using the given delimiter.
-
-- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified
-   path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
-
-- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
-   type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
-
-
-Collection-based:
-
-- `fromCollection(Collection)` - Creates a data set from the Java Java.util.Collection. All elements
-  in the collection must be of the same type.
-
-- `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The class specifies the
-  data type of the elements returned by the iterator.
-
-- `fromElements(T ...)` - Creates a data set from the given sequence of objects. All objects must be
-  of the same type.
-
-- `fromParallelCollection(SplittableIterator, Class)` - Creates a data set from an iterator, in
-  parallel. The class specifies the data type of the elements returned by the iterator.
-
-- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
-  parallel.
-
-Generic:
-
-- `readFile(inputFormat, path)` / `FileInputFormat` - Accepts a file input format.
-
-- `createInput(inputFormat)` / `InputFormat` - Accepts a generic input format.
-
-**Examples**
-
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// read text file from local files system
-DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
-
-// read text file from a HDFS running at nnHost:nnPort
-DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
-
-// read a CSV file with three fields
-DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
-	                       .types(Integer.class, String.class, Double.class);
-
-// read a CSV file with five fields, taking only two of them
-DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
-                               .includeFields("10010")  // take the first and the fourth field
-	                       .types(String.class, Double.class);
-
-// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
-DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
-                         .pojoType(Person.class, "name", "age", "zipcode");  
-
-
-// read a file from the specified path of type TextInputFormat
-DataSet<Tuple2<LongWritable, Text>> tuples =
- env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
-
-// read a file from the specified path of type SequenceFileInputFormat
-DataSet<Tuple2<IntWritable, Text>> tuples =
- env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
-
-// creates a set from some given elements
-DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
-
-// generate a number sequence
-DataSet<Long> numbers = env.generateSequence(1, 10000000);
+// generate a number sequence
+DataSet<Long> numbers = env.generateSequence(1, 10000000);
 
 // Read data from a relational database using the JDBC input format
 DataSet<Tuple2<String, Integer> dbData =
@@ -1919,7 +926,7 @@ File-based:
    path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
 
 - `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with
-   type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.  
+   type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
 
 Collection-based:
 
@@ -2065,71 +1072,6 @@ The following table lists the currently supported compression methods.
 
 {% top %}
 
-
-Execution Configuration
-----------
-
-The `ExecutionEnvironment` also contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-ExecutionConfig executionConfig = env.getConfig();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment
-var executionConfig = env.getConfig
-{% endhighlight %}
-</div>
-</div>
-
-The following configuration options are available: (the default is bold)
-
-- **`enableClosureCleaner()`** / `disableClosureCleaner()`. The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs.
-With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer.
-
-- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job.
-
-- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.
-
-- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
-
-- `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
-
-- `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly.
-
-- `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs.
-
-- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the [object reuse mode](#object-reuse-behavior) will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.
-
-- **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior.
-
-- `getGlobalJobParameters()` / `setGlobalJobParameters()` This method allows users to set custom objects as a global configuration for the job. Since the `ExecutionConfig` is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.
-
-- `addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)` Register a Kryo serializer instance for the given `type`.
-
-- `addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)` Register a Kryo serializer class for the given `type`.
-
-- `registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)` Register the given type with Kryo and specify a serializer for it. By registering a type with Kryo, the serialization of the type will be much more efficient.
-
-- `registerKryoType(Class<?> type)` If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags (integer IDs) are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs.
-
-- `registerPojoType(Class<?> type)` Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs.
-
-Note that types registered with `registerKryoType()` are not available to Flink's Kryo serializer instance.
-
-- `disableAutoTypeRegistration()` Automatic type registration is enabled by default. The automatic type registration is registering all types (including sub-types) used by usercode with Kryo and the POJO serializer.
-
-
-
-The `RuntimeContext` which is accessible in `Rich*` functions through the `getRuntimeContext()` method also allows to access the `ExecutionConfig` in all user defined functions.
-
-
-{% top %}
-
 Data Sinks
 ----------
 
@@ -2328,110 +1270,6 @@ Globally sorted output is not supported yet.
 
 {% top %}
 
-Debugging
----------
-
-Before running a data analysis program on a large data set in a distributed cluster, it is a good
-idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis
-programs is usually an incremental process of checking results, debugging, and improving.
-
-Flink provides a few nice features to significantly ease the development process of data analysis
-programs by supporting local debugging from within an IDE, injection of test data, and collection of
-result data. This section give some hints how to ease the development of Flink programs.
-
-### Local Execution Environment
-
-A `LocalEnvironment` starts a Flink system within the same JVM process it was created in. If you
-start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your
-program.
-
-A LocalEnvironment is created and used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-
-DataSet<String> lines = env.readTextFile(pathToTextFile);
-// build your program
-
-env.execute();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-val env = ExecutionEnvironment.createLocalEnvironment()
-
-val lines = env.readTextFile(pathToTextFile)
-// build your program
-
-env.execute();
-{% endhighlight %}
-</div>
-</div>
-
-### Collection Data Sources and Sinks
-
-Providing input for an analysis program and checking its output is cumbersome when done by creating
-input files and reading output files. Flink features special data sources and sinks which are backed
-by Java collections to ease testing. Once a program has been tested, the sources and sinks can be
-easily replaced by sources and sinks that read from / write to external data stores such as HDFS.
-
-Collection data sources can be used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-
-// Create a DataSet from a list of elements
-DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
-
-// Create a DataSet from any Java collection
-List<Tuple2<String, Integer>> data = ...
-DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
-
-// Create a DataSet from an Iterator
-Iterator<Long> longIt = ...
-DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);
-{% endhighlight %}
-
-A collection data sink is specified as follows:
-
-{% highlight java %}
-DataSet<Tuple2<String, Integer>> myResult = ...
-
-List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
-myResult.output(new LocalCollectionOutputFormat(outData));
-{% endhighlight %}
-
-**Note:** Currently, the collection data sink is restricted to local execution, as a debugging tool.
-
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.createLocalEnvironment()
-
-// Create a DataSet from a list of elements
-val myInts = env.fromElements(1, 2, 3, 4, 5)
-
-// Create a DataSet from any Collection
-val data: Seq[(String, Int)] = ...
-val myTuples = env.fromCollection(data)
-
-// Create a DataSet from an Iterator
-val longIt: Iterator[Long] = ...
-val myLongs = env.fromCollection(longIt)
-{% endhighlight %}
-</div>
-</div>
-
-**Note:** Currently, the collection data source requires that data types and iterators implement
-`Serializable`. Furthermore, collection data sources can not be executed in parallel (
-parallelism = 1).
-
-{% top %}
 
 Iteration Operators
 -------------------
@@ -2573,76 +1411,196 @@ val env = ExecutionEnvironment.getExecutionEnvironment()
 // Create initial DataSet
 val initial = env.fromElements(0)
 
-val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
-  val result = iterationInput.map { i =>
-    val x = Math.random()
-    val y = Math.random()
-    i + (if (x * x + y * y < 1) 1 else 0)
-  }
-  result
-}
+val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
+  val result = iterationInput.map { i =>
+    val x = Math.random()
+    val y = Math.random()
+    i + (if (x * x + y * y < 1) 1 else 0)
+  }
+  result
+}
+
+val result = count map { c => c / 10000.0 * 4 }
+
+result.print()
+
+env.execute("Iterative Pi Example");
+{% endhighlight %}
+
+You can also check out the
+{% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala "K-Means example" %},
+which uses a BulkIteration to cluster a set of unlabeled points.
+
+#### Delta Iterations
+
+Delta iterations exploit the fact that certain algorithms do not change every data point of the
+solution in each iteration.
+
+In addition to the partial solution that is fed back (called workset) in every iteration, delta
+iterations maintain state across iterations (called solution set), which can be updated through
+deltas. The result of the iterative computation is the state after the last iteration. Please refer
+to the [Introduction to Iterations](iterations.html) for an overview of the basic principle of delta
+iterations.
+
+Defining a DeltaIteration is similar to defining a BulkIteration. For delta iterations, two data
+sets form the input to each iteration (workset and solution set), and two data sets are produced as
+the result (new workset, solution set delta) in each iteration.
+
+To create a DeltaIteration call the `iterateDelta(initialWorkset, maxIterations, key)` on the
+initial solution set. The step function takes two parameters: (solutionSet, workset), and must
+return two values: (solutionSetDelta, newWorkset).
+
+Below is an example for the syntax of a delta iteration
+
+{% highlight scala %}
+// read the initial data sets
+val initialSolutionSet: DataSet[(Long, Double)] = // [...]
+
+val initialWorkset: DataSet[(Long, Double)] = // [...]
+
+val maxIterations = 100
+val keyPosition = 0
+
+val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
+  (solution, workset) =>
+    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
+    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
+
+    val nextWorkset = deltas.filter(new FilterByThreshold())
+
+    (deltas, nextWorkset)
+}
+
+result.writeAsCsv(outputPath)
+
+env.execute()
+{% endhighlight %}
+
+</div>
+</div>
+
+{% top %}
+
+Object reuse behavior
+---------------------
+
+Apache Flink is trying to reduce the number of object allocations for better performance.
+
+By default, user defined functions (like `map()` or `groupReduce()`) are getting new objects on each call (or through an iterator). So it is possible to keep references to the objects inside the function (for example in a List).
+
+User defined functions are often chained, for example when two mappers with the same parallelism are defined one after another. In the chaining case, the functions in the chain are receiving the same object instances. So the the second `map()` function is receiving the objects the first `map()` is returning.
+This behavior can lead to errors when the first `map()` function keeps a list of all objects and the second mapper is modifying objects. In that case, the user has to manually create copies of the objects before putting them into the list.
+
+Also note that the system assumes that the user is not modifying the incoming objects in the `filter()` function.
+
+There is a switch at the `ExectionConfig` which allows users to enable the object reuse mode (`enableObjectReuse()`). For mutable types, Flink will reuse object instances. In practice that means that a `map()` function will always receive the same object instance (with its fields set to new values). The object reuse mode will lead to better performance because fewer objects are created, but the user has to manually take care of what they are doing with the object references.
+
+{% top %}
+
+Debugging
+---------
+
+Before running a data analysis program on a large data set in a distributed cluster, it is a good
+idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis
+programs is usually an incremental process of checking results, debugging, and improving.
+
+Flink provides a few nice features to significantly ease the development process of data analysis
+programs by supporting local debugging from within an IDE, injection of test data, and collection of
+result data. This section give some hints how to ease the development of Flink programs.
+
+### Local Execution Environment
+
+A `LocalEnvironment` starts a Flink system within the same JVM process it was created in. If you
+start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your
+program.
+
+A LocalEnvironment is created and used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+DataSet<String> lines = env.readTextFile(pathToTextFile);
+// build your program
+
+env.execute();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
 
-val result = count map { c => c / 10000.0 * 4 }
+{% highlight scala %}
+val env = ExecutionEnvironment.createLocalEnvironment()
 
-result.print()
+val lines = env.readTextFile(pathToTextFile)
+// build your program
 
-env.execute("Iterative Pi Example");
+env.execute();
 {% endhighlight %}
+</div>
+</div>
 
-You can also check out the
-{% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala "K-Means example" %},
-which uses a BulkIteration to cluster a set of unlabeled points.
+### Collection Data Sources and Sinks
 
-#### Delta Iterations
+Providing input for an analysis program and checking its output is cumbersome when done by creating
+input files and reading output files. Flink features special data sources and sinks which are backed
+by Java collections to ease testing. Once a program has been tested, the sources and sinks can be
+easily replaced by sources and sinks that read from / write to external data stores such as HDFS.
 
-Delta iterations exploit the fact that certain algorithms do not change every data point of the
-solution in each iteration.
+Collection data sources can be used as follows:
 
-In addition to the partial solution that is fed back (called workset) in every iteration, delta
-iterations maintain state across iterations (called solution set), which can be updated through
-deltas. The result of the iterative computation is the state after the last iteration. Please refer
-to the [Introduction to Iterations](iterations.html) for an overview of the basic principle of delta
-iterations.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
-Defining a DeltaIteration is similar to defining a BulkIteration. For delta iterations, two data
-sets form the input to each iteration (workset and solution set), and two data sets are produced as
-the result (new workset, solution set delta) in each iteration.
+// Create a DataSet from a list of elements
+DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
 
-To create a DeltaIteration call the `iterateDelta(initialWorkset, maxIterations, key)` on the
-initial solution set. The step function takes two parameters: (solutionSet, workset), and must
-return two values: (solutionSetDelta, newWorkset).
+// Create a DataSet from any Java collection
+List<Tuple2<String, Integer>> data = ...
+DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
 
-Below is an example for the syntax of a delta iteration
+// Create a DataSet from an Iterator
+Iterator<Long> longIt = ...
+DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);
+{% endhighlight %}
 
-{% highlight scala %}
-// read the initial data sets
-val initialSolutionSet: DataSet[(Long, Double)] = // [...]
+A collection data sink is specified as follows:
 
-val initialWorkset: DataSet[(Long, Double)] = // [...]
+{% highlight java %}
+DataSet<Tuple2<String, Integer>> myResult = ...
 
-val maxIterations = 100
-val keyPosition = 0
+List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
+myResult.output(new LocalCollectionOutputFormat(outData));
+{% endhighlight %}
 
-val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
-  (solution, workset) =>
-    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
-    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
+**Note:** Currently, the collection data sink is restricted to local execution, as a debugging tool.
 
-    val nextWorkset = deltas.filter(new FilterByThreshold())
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.createLocalEnvironment()
 
-    (deltas, nextWorkset)
-}
+// Create a DataSet from a list of elements
+val myInts = env.fromElements(1, 2, 3, 4, 5)
 
-result.writeAsCsv(outputPath)
+// Create a DataSet from any Collection
+val data: Seq[(String, Int)] = ...
+val myTuples = env.fromCollection(data)
 
-env.execute()
+// Create a DataSet from an Iterator
+val longIt: Iterator[Long] = ...
+val myLongs = env.fromCollection(longIt)
 {% endhighlight %}
-
 </div>
 </div>
 
-{% top %}
+**Note:** Currently, the collection data source requires that data types and iterators implement
+`Serializable`. Furthermore, collection data sources can not be executed in parallel (
+parallelism = 1).
 
+{% top %}
 
 Semantic Annotations
 -----------
@@ -3059,352 +2017,3 @@ public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<S
 {% endhighlight %}
 
 {% top %}
-
-Program Packaging and Distributed Execution
------------------------------------------
-
-As described in the [program skeleton](#program-skeleton) section, Flink programs can be executed on
-clusters by using the `RemoteEnvironment`. Alternatively, programs can be packaged into JAR Files
-(Java Archives) for execution. Packaging the program is a prerequisite to executing them through the
-[command line interface]({{ site.baseurl }}/apis/cli.html) or the [web interface]({{ site.baseurl }}/apis/web_client.html).
-
-#### Packaging Programs
-
-To support execution from a packaged JAR file via the command line or web interface, a program must
-use the environment obtained by `ExecutionEnvironment.getExecutionEnvironment()`. This environment
-will act as the cluster's environment when the JAR is submitted to the command line or web
-interface. If the Flink program is invoked differently than through these interfaces, the
-environment will act like a local environment.
-
-To package the program, simply export all involved classes as a JAR file. The JAR file's manifest
-must point to the class that contains the program's *entry point* (the class with the public
-`main` method). The simplest way to do this is by putting the *main-class* entry into the
-manifest (such as `main-class: org.apache.flinkexample.MyProgram`). The *main-class* attribute is
-the same one that is used by the Java Virtual Machine to find the main method when executing a JAR
-files through the command `java -jar pathToTheJarFile`. Most IDEs offer to include that attribute
-automatically when exporting JAR files.
-
-
-#### Packaging Programs through Plans
-
-Additionally, we support packaging programs as *Plans*. Instead of defining a progam in the main
-method and calling
-`execute()` on the environment, plan packaging returns the *Program Plan*, which is a description of
-the program's data flow. To do that, the program must implement the
-`org.apache.flink.api.common.Program` interface, defining the `getPlan(String...)` method. The
-strings passed to that method are the command line arguments. The program's plan can be created from
-the environment via the `ExecutionEnvironment#createProgramPlan()` method. When packaging the
-program's plan, the JAR manifest must point to the class implementing the
-`org.apache.flinkapi.common.Program` interface, instead of the class with the main method.
-
-
-#### Summary
-
-The overall procedure to invoke a packaged program is as follows:
-
-1. The JAR's manifest is searched for a *main-class* or *program-class* attribute. If both
-attributes are found, the *program-class* attribute takes precedence over the *main-class*
-attribute. Both the command line and the web interface support a parameter to pass the entry point
-class name manually for cases where the JAR manifest contains neither attribute.
-
-2. If the entry point class implements the `org.apache.flinkapi.common.Program`, then the system
-calls the `getPlan(String...)` method to obtain the program plan to execute.
-
-3. If the entry point class does not implement the `org.apache.flinkapi.common.Program` interface,
-the system will invoke the main method of the class.
-
-{% top %}
-
-Accumulators & Counters
----------------------------
-
-Accumulators are simple constructs with an **add operation** and a **final accumulated result**,
-which is available after the job ended.
-
-The most straightforward accumulator is a **counter**: You can increment it using the
-```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial
-results and send the result to the client. Accumulators are useful during debugging or if you
-quickly want to find out more about your data.
-
-Flink currently has the following **built-in accumulators**. Each of them implements the
-{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
-interface.
-
-- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %},
-  {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %}
-  and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}:
-  See below for an example using a counter.
-- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}:
-  A histogram implementation for a discrete number of bins. Internally it is just a map from Integer
-  to Integer. You can use this to compute distributions of values, e.g. the distribution of
-  words-per-line for a word count program.
-
-__How to use accumulators:__
-
-First you have to create an accumulator object (here a counter) in the operator function where you
-want to use it. Operator function here refers to the (anonymous inner) class implementing the user
-defined code for an operator.
-
-{% highlight java %}
-private IntCounter numLines = new IntCounter();
-{% endhighlight %}
-
-Second you have to register the accumulator object, typically in the ```open()``` method of the
-operator function. Here you also define the name.
-
-{% highlight java %}
-getRuntimeContext().addAccumulator("num-lines", this.numLines);
-{% endhighlight %}
-
-You can now use the accumulator anywhere in the operator function, including in the ```open()``` and
-```close()``` methods.
-
-{% highlight java %}
-this.numLines.add(1);
-{% endhighlight %}
-
-The overall result will be stored in the ```JobExecutionResult``` object which is returned when
-running a job using the Java API (currently this only works if the execution waits for the
-completion of the job).
-
-{% highlight java %}
-myJobExecutionResult.getAccumulatorResult("num-lines")
-{% endhighlight %}
-
-All accumulators share a single namespace per job. Thus you can use the same accumulator in
-different operator functions of your job. Flink will internally merge all accumulators with the same
-name.
-
-A note on accumulators and iterations: Currently the result of accumulators is only available after
-the overall job ended. We plan to also make the result of the previous iteration available in the
-next iteration. You can use
-{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "Aggregators" %}
-to compute per-iteration statistics and base the termination of iterations on such statistics.
-
-__Custom accumulators:__
-
-To implement your own accumulator you simply have to write your implementation of the Accumulator
-interface. Feel free to create a pull request if you think your custom accumulator should be shipped
-with Flink.
-
-You have the choice to implement either
-{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
-or {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}.
-
-```Accumulator<V,R>``` is most flexible: It defines a type ```V``` for the value to add, and a
-result type ```R``` for the final result. E.g. for a histogram, ```V``` is a number and ```R``` i
- a histogram. ```SimpleAccumulator``` is for the cases where both types are the same, e.g. for counters.
-
-{% top %}
-
-Parallel Execution
-------------------
-
-This section describes how the parallel execution of programs can be configured in Flink. A Flink
-program consists of multiple tasks (operators, data sources, and sinks). A task is split into
-several parallel instances for execution and each parallel instance processes a subset of the task's
-input data. The number of parallel instances of a task is called its *parallelism*.
-
-
-The parallelism of a task can be specified in Flink on different levels.
-
-### Operator Level
-
-The parallelism of an individual operator, data source, or data sink can be defined by calling its
-`setParallelism()` method.  For example, the parallelism of the `Sum` operator in the
-[WordCount](examples.html#word-count) example program can be set to `5` as follows :
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-DataSet<String> text = [...]
-DataSet<Tuple2<String, Integer>> wordCounts = text
-    .flatMap(new LineSplitter())
-    .groupBy(0)
-    .sum(1).setParallelism(5);
-wordCounts.print();
-
-env.execute("Word Count Example");
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-val text = [...]
-val wordCounts = text
-    .flatMap{ _.split(" ") map { (_, 1) } }
-    .groupBy(0)
-    .sum(1).setParallelism(5)
-wordCounts.print()
-
-env.execute("Word Count Example")
-{% endhighlight %}
-</div>
-</div>
-
-### Execution Environment Level
-
-Flink programs are executed in the context of an [execution environment](#program-skeleton). An
-execution environment defines a default parallelism for all operators, data sources, and data sinks
-it executes. Execution environment parallelism can be overwritten by explicitly configuring the
-parallelism of an operator.
-
-The default parallelism of an execution environment can be specified by calling the
-`setParallelism()` method. To execute all operators, data sources, and data sinks of the
-[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the
-execution environment as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(3);
-
-DataSet<String> text = [...]
-DataSet<Tuple2<String, Integer>> wordCounts = [...]
-wordCounts.print();
-
-env.execute("Word Count Example");
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment
-env.setParallelism(3)
-
-val text = [...]
-val wordCounts = text
-    .flatMap{ _.split(" ") map { (_, 1) } }
-    .groupBy(0)
-    .sum(1)
-wordCounts.print()
-
-env.execute("Word Count Example")
-{% endhighlight %}
-</div>
-</div>
-
-### Client Level
-
-The parallelism can be set at the Client when submitting jobs to Flink. The
-Client can either be a Java or a Scala program. One example of such a Client is
-Flink's Command-line Interface (CLI).
-
-For the CLI client, the parallelism parameter can be specified with `-p`. For
-exampple:
-
-    ./bin/flink run -p 10 ../examples/*WordCount-java*.jar
-
-
-In a Java/Scala program, the parallelism is set as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-try {
-    PackagedProgram program = new PackagedProgram(file, args);
-    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
-    Configuration config = new Configuration();
-
-    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
-
-    // set the parallelism to 10 here
-    client.run(program, 10, true);
-
-} catch (ProgramInvocationException e) {
-    e.printStackTrace();
-}
-
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-try {
-    PackagedProgram program = new PackagedProgram(file, args)
-    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
-    Configuration config = new Configuration()
-
-    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())
-
-    // set the parallelism to 10 here
-    client.run(program, 10, true)
-
-} catch {
-    case e: Exception => e.printStackTrace
-}
-{% endhighlight %}
-</div>
-</div>
-
-
-### System Level
-
-A system-wide default parallelism for all execution environments can be defined by setting the
-`parallelism.default` property in `./conf/flink-conf.yaml`. See the
-[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details.
-
-{% top %}
-
-Execution Plans
----------------
-
-Depending on various parameters such as data size or number of machines in the cluster, Flink's
-optimizer automatically chooses an execution strategy for your program. In many cases, it can be
-useful to know how exactly Flink will execute your program.
-
-__Plan Visualization Tool__
-
-Flink comes packaged with a visualization tool for execution plans. The HTML document containing
-the visualizer is located under ```tools/planVisualizer.html```. It takes a JSON representation of
-the job execution plan and visualizes it as a graph with complete annotations of execution
-strategies.
-
-The following code shows how to print the execution plan JSON from your program:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-...
-
-System.out.println(env.getExecutionPlan());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-...
-
-println(env.getExecutionPlan())
-{% endhighlight %}
-</div>
-</div>
-
-
-To visualize the execution plan, do the following:
-
-1. **Open** ```planVisualizer.html``` with your web browser,
-2. **Paste** the JSON string into the text field, and
-3. **Press** the draw button.
-
-After these steps, a detailed execution plan will be visualized.
-
-<img alt="A flink job execution graph." src="fig/plan_visualizer.png" width="80%">
-
-
-__Web Interface__
-
-Flink offers a web interface for submitting and executing jobs. If you choose to use this interface to submit your packaged program, you have the option to also see the plan visualization.
-
-The script to start the webinterface is located under ```bin/start-webclient.sh```. After starting the webclient (per default on **port 8080**), your program can be uploaded and will be added to the list of available programs on the left side of the interface.
-
-You are able to specify program arguments in the textbox at the bottom of the page. Checking the plan visualization checkbox shows the execution plan before executing the actual program.
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/be4601ea/docs/apis/common/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/docs/apis/common/fig/plan_visualizer.png b/docs/apis/common/fig/plan_visualizer.png
new file mode 100644
index 0000000..85b8c55
Binary files /dev/null and b/docs/apis/common/fig/plan_visualizer.png differ


Mime
View raw message