flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [12/51] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
Date Wed, 24 Aug 2016 09:26:32 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libs/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/storm_compatibility.md b/docs/dev/libs/storm_compatibility.md
new file mode 100644
index 0000000..89d7706
--- /dev/null
+++ b/docs/dev/libs/storm_compatibility.md
@@ -0,0 +1,287 @@
+---
+title: "Storm Compatibility"
+is_beta: true
+nav-parent_id: libs
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Flink streaming]({{ site.baseurl }}/dev/datastream_api.html) is compatible with Apache Storm interfaces and therefore allows
+reusing code that was implemented for Storm.
+
+You can:
+
+- execute a whole Storm `Topology` in Flink.
+- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
+
+This document shows how to use existing Storm code with Flink.
+
+* This will be replaced by the TOC
+{:toc}
+
+# Project Configuration
+
+Support for Storm is contained in the `flink-storm` Maven module.
+The code resides in the `org.apache.flink.storm` package.
+
+Add the following dependency to your `pom.xml` if you want to execute Storm code in Flink.
+
+~~~xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-storm{{ site.scala_version_suffix }}</artifactId>
+	<version>{{site.version}}</version>
+</dependency>
+~~~
+
+**Please note**: Do not add `storm-core` as a dependency. It is already included via `flink-storm`.
+
+**Please note**: `flink-storm` is not part of the provided binary Flink distribution.
+Thus, you need to include `flink-storm` classes (and their dependencies) in your program jar (also called ueber-jar or fat-jar) that is submitted to Flink's JobManager.
+See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how to package a jar correctly.
+
+If you want to avoid large ueber-jars, you can manually copy `storm-core-0.9.4.jar`, `json-simple-1.1.jar` and `flink-storm-{{site.version}}.jar` into Flink's `lib/` folder of each cluster node (*before* the cluster is started).
+For this case, it is sufficient to include only your own Spout and Bolt classes (and their internal dependencies) into the program jar.
+
+# Execute Storm Topologies
+
+Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:
+
+- `StormSubmitter` replaced by `FlinkSubmitter`
+- `NimbusClient` and `Client` replaced by `FlinkClient`
+- `LocalCluster` replaced by `FlinkLocalCluster`
+
+In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
+The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
+If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.  If a parameter is not specified, the value is taken from `flink-conf.yaml`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
+
+// actual topology assembling code and used Spouts/Bolts can be used as-is
+builder.setSpout("source", new FileSpout(inputFilePath));
+builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
+builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
+
+Config conf = new Config();
+if(runLocal) { // submit to test cluster
+	// replaces: LocalCluster cluster = new LocalCluster();
+	FlinkLocalCluster cluster = new FlinkLocalCluster();
+	cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
+} else { // submit to remote cluster
+	// optional
+	// conf.put(Config.NIMBUS_HOST, "remoteHost");
+	// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+	// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+	FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
+}
+~~~
+</div>
+</div>
+
+# Embed Storm Operators in Flink Streaming Programs
+
+As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
+The Storm compatibility layer offers a wrapper classes for each, namely `SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
+
+Per default, both wrappers convert Storm output tuples to Flink's [Tuple]({{site.baseurl}}/dev/api_concepts.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
+For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
+
+Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
+In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` can be used.
+
+## Embed Spouts
+
+In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
+The Spout object is handed to the constructor of `SpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
+The generic type declaration `OUT` specifies the type of the source output stream.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// stream has `raw` type (single field output streams only)
+DataStream<String> rawInput = env.addSource(
+	new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
+	TypeExtractor.getForClass(String.class)); // output type
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
+If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures to terminate automatically by setting `numberOfInvocations` parameter in its constructor.
+This allows the Flink program to shut down automatically after all data is processed.
+Per default the program will run until it is [canceled]({{site.baseurl}}/setup/cli.html) manually.
+
+
+## Embed Bolts
+
+In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
+The Bolt object is handed to the constructor of `BoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
+The generic type declarations `IN` and `OUT` specify the type of the operator's input and output stream, respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+DataStream<String> text = env.readTextFile(localFilePath);
+
+DataStream<Tuple2<String, Integer>> counts = text.transform(
+	"tokenizer", // operator name
+	TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
+	new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
+
+// do further processing
+[...]
+~~~
+</div>
+</div>
+
+### Named Attribute Access for Embedded Bolts
+
+Bolts can accesses input tuple fields via name (additionally to access via index).
+To use this feature with embedded Bolts, you need to have either a
+
+ 1. [POJO]({{site.baseurl}}/dev/api_concepts.html#pojos) type input stream or
+ 2. [Tuple]({{site.baseurl}}/dev/api_concepts.html#tuples-and-case-classes) type input stream and specify the input schema (i.e. name-to-index-mapping)
+
+For POJO input types, Flink accesses the fields via reflection.
+For this case, Flink expects either a corresponding public member variable or public getter method.
+For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).
+
+For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
+For this case, the constructor of `BoltWrapper` takes an additional argument: `new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))`.
+The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
+
+See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
+
+## Configuring Spouts and Bolts
+
+In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
+This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, Flink's configuration mechanism must be used.
+A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
+Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
+However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
+Thus, Flink additionally provides `StormConfig` class that can be used like a raw `Map` to provide full compatibility to Storm.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+StormConfig config = new StormConfig();
+// set config values
+[...]
+
+// set global Storm configuration
+env.getConfig().setGlobalJobParameters(config);
+
+// assemble program with embedded Spouts and/or Bolts
+[...]
+~~~
+</div>
+</div>
+
+## Multiple Output Streams
+
+Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
+Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
+Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+[...]
+
+// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
+DataStream<SplitStreamType<SomeType>> multiStream = ...
+
+SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
+
+// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
+DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
+
+// do further processing on s1 and s2
+[...]
+~~~
+</div>
+</div>
+
+See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java) for a full example.
+
+# Flink Extensions
+
+## Finite Spouts
+
+In Flink, streaming sources can be finite, ie, emit a finite number of records and stop after emitting the last record. However, Spouts usually emit infinite streams.
+The bridge between the two approaches is the `FiniteSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
+The user can create a finite Spout by implementing this interface instead of (or additionally to) `IRichSpout`, and implementing the `reachedEnd()` method in addition.
+In contrast to a `SpoutWrapper` that is configured to emit a finite number of tuples, `FiniteSpout` interface allows to implement more complex termination criteria.
+
+Although finite Spouts are not necessary to embed Spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
+
+ * to achieve that a native Spout behaves the same way as a finite Flink source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the Spout can stop automatically
+ * reading a file into a stream
+ * for testing purposes
+
+An example of a finite Spout that emits records for 10 seconds only:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
+	[...] // implement open(), nextTuple(), ...
+
+	private long starttime = System.currentTimeMillis();
+
+	public boolean reachedEnd() {
+		return System.currentTimeMillis() - starttime > 10000l;
+	}
+}
+~~~
+</div>
+</div>
+
+# Storm Compatibility Examples
+
+You can find more examples in Maven module `flink-storm-examples`.
+For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/README.md).
+To run the examples, you need to assemble a correct jar file.
+`flink-storm-examples-{{ site.version }}.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
+
+There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
+Compare `pom.xml` to see how both jars are built.
+Furthermore, there is one example for whole Storm topologies (`WordCount-StormTopology.jar`).
+
+You can run each of those examples via `bin/flink run <jarname>.jar`. The correct entry point class is contained in each jar's manifest file.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/local_execution.md b/docs/dev/local_execution.md
new file mode 100644
index 0000000..a348951
--- /dev/null
+++ b/docs/dev/local_execution.md
@@ -0,0 +1,125 @@
+---
+title:  "Local Execution"
+nav-parent_id: dev
+nav-pos: 11
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink can run on a single machine, even in a single Java Virtual Machine. This allows users to test and debug Flink programs locally. This section gives an overview of the local execution mechanisms.
+
+The local environments and executors allow you to run Flink programs in a local Java Virtual Machine, or with within any JVM as part of existing programs. Most examples can be launched locally by simply hitting the "Run" button of your IDE.
+
+There are two different kinds of local execution supported in Flink. The `LocalExecutionEnvironment` is starting the full Flink runtime, including a JobManager and a TaskManager. These include memory management and all the internal algorithms that are executed in the cluster mode.
+
+The `CollectionEnvironment` is executing the Flink program on Java collections. This mode will not start the full Flink runtime, so the execution is very low-overhead and lightweight. For example a `DataSet.map()`-transformation will be executed by applying the `map()` function to all elements in a Java list.
+
+* TOC
+{:toc}
+
+
+## Debugging
+
+If you are running Flink programs locally, you can also debug your program like any other Java program. You can either use `System.out.println()` to write out some internal variables or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()` and all the other methods.
+Please also refer to the [debugging section]({{ site.baseurl }}/dev/batch/index.html#debugging) in the Java API documentation for a guide to testing and local debugging utilities in the Java API.
+
+## Maven Dependency
+
+If you are developing your program in a Maven project, you have to add the `flink-clients` module using this dependency:
+
+~~~xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+</dependency>
+~~~
+
+## Local Environment
+
+The `LocalEnvironment` is a handle to local execution for Flink programs. Use it to run a program within a local JVM - standalone or embedded in other programs.
+
+The local environment is instantiated via the method `ExecutionEnvironment.createLocalEnvironment()`. By default, it will use as many local threads for execution as your machine has CPU cores (hardware contexts). You can alternatively specify the desired parallelism. The local environment can be configured to log to the console using `enableLogging()`/`disableLogging()`.
+
+In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface]({{ site.baseurl }}/setup/cli.html).
+
+~~~java
+public static void main(String[] args) throws Exception {
+    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+    DataSet<String> data = env.readTextFile("file:///path/to/file");
+
+    data
+        .filter(new FilterFunction<String>() {
+            public boolean filter(String value) {
+                return value.startsWith("http://");
+            }
+        })
+        .writeAsText("file:///path/to/result");
+
+    JobExecutionResult res = env.execute();
+}
+~~~
+
+The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
+
+The `LocalEnvironment` allows also to pass custom configuration values to Flink.
+
+~~~java
+Configuration conf = new Configuration();
+conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
+final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
+~~~
+
+*Note:* The local execution environments do not start any web frontend to monitor the execution.
+
+## Collection Environment
+
+The execution on Java Collections using the `CollectionEnvironment` is a low-overhead approach for executing Flink programs. Typical use-cases for this mode are automated tests, debugging and code re-use.
+
+Users can use algorithms implemented for batch processing also for cases that are more interactive. A slightly changed variant of a Flink program could be used in a Java Application Server for processing incoming requests.
+
+**Skeleton for Collection-based execution**
+
+~~~java
+public static void main(String[] args) throws Exception {
+    // initialize a new Collection-based execution environment
+    final ExecutionEnvironment env = new CollectionEnvironment();
+
+    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);
+
+    /* Data Set transformations ... */
+
+    // retrieve the resulting Tuple2 elements into a ArrayList.
+    Collection<...> result = new ArrayList<...>();
+    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
+
+    // kick off execution.
+    env.execute();
+
+    // Do some work with the resulting ArrayList (=Collection).
+    for(... t : result) {
+        System.err.println("Result = "+t);
+    }
+}
+~~~
+
+The `flink-examples-batch` module contains a full example, called `CollectionExecutionExample`.
+
+Please note that the execution of the collection-based Flink programs is only possible on small data, which fits into the JVM heap. The execution on collections is not multi-threaded, only one thread is used.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/quickstarts.md
----------------------------------------------------------------------
diff --git a/docs/dev/quickstarts.md b/docs/dev/quickstarts.md
new file mode 100644
index 0000000..ef21ca6
--- /dev/null
+++ b/docs/dev/quickstarts.md
@@ -0,0 +1,24 @@
+---
+title: "Quickstarts"
+nav-id: quickstarts
+nav-parent_id: dev
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
new file mode 100644
index 0000000..ffa6145
--- /dev/null
+++ b/docs/dev/scala_api_extensions.md
@@ -0,0 +1,408 @@
+---
+title: "Scala API Extensions"
+nav-parent_id: apis
+nav-pos: 104
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+In order to keep a fair amount of consistency between the Scala and Java APIs, some
+of the features that allow a high-level of expressiveness in Scala have been left
+out from the standard APIs for both batch and streaming.
+
+If you want to _enjoy the full Scala experience_ you can choose to opt-in to
+extensions that enhance the Scala API via implicit conversions.
+
+To use all the available extensions, you can just add a simple `import` for the
+DataSet API
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions._
+{% endhighlight %}
+
+or the DataStream API
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions._
+{% endhighlight %}
+
+Alternatively, you can import individual extensions _a-là-carte_ to only use those
+you prefer.
+
+## Accept partial functions
+
+Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
+matching functions to deconstruct tuples, case classes or collections, like the
+following:
+
+{% highlight scala %}
+val data: DataSet[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+  // The previous line causes the following compilation error:
+  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
+}
+{% endhighlight %}
+
+This extension introduces new methods in both the DataSet and DataStream Scala API
+that have a one-to-one correspondance in the extended API. These delegating methods
+do support anonymous pattern matching functions.
+
+#### DataSet API
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Original</th>
+      <th class="text-center">Example</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith {
+  case (_, value) => value.toString
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapPartitionWith</strong></td>
+      <td><strong>mapPartition (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapPartitionWith {
+  case head #:: _ => head
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith {
+  case (_, name, visitTimes) => visitTimes.map(name -> _)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>filterWith</strong></td>
+      <td><strong>filter (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.filterWith {
+  case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceWith</strong></td>
+      <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceWith {
+  case ((_, amount1), (_, amount2)) => amount1 + amount2
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceGroupWith</strong></td>
+      <td><strong>reduceGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceGroupWith {
+  case id #:: value #:: _ => id -> value
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>groupingBy</strong></td>
+      <td><strong>groupBy (DataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data.groupingBy {
+  case (id, _, _) => id
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>sortGroupWith</strong></td>
+      <td><strong>sortGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+grouped.sortGroupWith(Order.ASCENDING) {
+  case House(_, value) => value
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>combineGroupWith</strong></td>
+      <td><strong>combineGroup (GroupedDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+grouped.combineGroupWith {
+  case header #:: amounts => amounts.sum
+}
+{% endhighlight %}
+      </td>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data1.join(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case ((pk, tx), (products, fk)) => tx -> products
+  }
+
+data1.cross(data2).projecting {
+  case ((a, _), (_, b) => a -> b
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (CoGroupDataSet)</strong></td>
+      <td>
+{% highlight scala %}
+data1.coGroup(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case (head1 #:: _, head2 #:: _) => head1 -> head2
+  }
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    </tr>
+  </tbody>
+</table>
+
+#### DataStream API
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Method</th>
+      <th class="text-left" style="width: 20%">Original</th>
+      <th class="text-center">Example</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith {
+  case (_, value) => value.toString
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapPartitionWith</strong></td>
+      <td><strong>mapPartition (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapPartitionWith {
+  case head #:: _ => head
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith {
+  case (_, name, visits) => visits.map(name -> _)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>filterWith</strong></td>
+      <td><strong>filter (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.filterWith {
+  case Train(_, isOnTime) => isOnTime
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>keyingBy</strong></td>
+      <td><strong>keyBy (DataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.keyingBy {
+  case (id, _, _) => id
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>mapWith</strong></td>
+      <td><strong>map (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.mapWith(
+  map1 = case (_, value) => value.toString,
+  map2 = case (_, _, value, _) => value + 1
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>flatMapWith</strong></td>
+      <td><strong>flatMap (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.flatMapWith(
+  flatMap1 = case (_, json) => parse(json),
+  flatMap2 = case (_, _, json, _) => parse(json)
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>keyingBy</strong></td>
+      <td><strong>keyBy (ConnectedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.keyingBy(
+  key1 = case (_, timestamp) => timestamp,
+  key2 = case (id, _, _) => id
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>reduceWith</strong></td>
+      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.reduceWith {
+  case ((_, sum1), (_, sum2) => sum1 + sum2
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>foldWith</strong></td>
+      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.foldWith(User(bought = 0)) {
+  case (User(b), (_, items)) => User(b + items.size)
+}
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>applyWith</strong></td>
+      <td><strong>apply (WindowedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data.applyWith(0)(
+  foldFunction = case (sum, amount) => sum + amount
+  windowFunction = case (k, w, sum) => // [...]
+)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>projecting</strong></td>
+      <td><strong>apply (JoinedDataStream)</strong></td>
+      <td>
+{% highlight scala %}
+data1.join(data2).
+  whereClause(case (pk, _) => pk).
+  isEqualTo(case (_, fk) => fk).
+  projecting {
+    case ((pk, tx), (products, fk)) => tx -> products
+  }
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+
+For more information on the semantics of each method, please refer to the
+[DataSet]({{ site.baseurl }}/dev/batch/index.html) and [DataStream]({{ site.baseurl }}/dev/datastream_api.html) API documentation.
+
+To use this extension exclusively, you can add the following `import`:
+
+{% highlight scala %}
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+for the DataSet extensions and
+
+{% highlight scala %}
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+{% endhighlight %}
+
+The following snippet shows a minimal example of how to use these extension
+methods together (with the DataSet API):
+
+{% highlight scala %}
+object Main {
+  import org.apache.flink.api.scala.extensions._
+  case class Point(x: Double, y: Double)
+  def main(args: Array[String]): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+    ds.filterWith {
+      case Point(x, _) => x > 1
+    }.reduceWith {
+      case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+    }.mapWith {
+      case Point(x, y) => (x, y)
+    }.flatMapWith {
+      case (x, y) => Seq("x" -> x, "y" -> y)
+    }.groupingBy {
+      case (id, value) => id
+    }
+  }
+}
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_shell.md b/docs/dev/scala_shell.md
new file mode 100644
index 0000000..0728812
--- /dev/null
+++ b/docs/dev/scala_shell.md
@@ -0,0 +1,193 @@
+---
+title: "Scala Shell"
+nav-parent_id: dev
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink comes with an integrated interactive Scala Shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use the shell with an integrated Flink cluster just execute:
+
+~~~bash
+bin/start-scala-shell.sh local
+~~~
+
+in the root directory of your binary Flink directory. To run the Shell on a
+cluster, please see the Setup section below.
+
+## Usage
+
+The shell supports Batch and Streaming.
+Two different ExecutionEnvironments are automatically prebound after startup.
+Use "benv" and "senv" to access the Batch and Streaming environment respectively.
+
+### DataSet API
+
+The following example will execute the wordcount program in the Scala shell:
+
+~~~scala
+Scala-Flink> val text = benv.fromElements(
+  "To be, or not to be,--that is the question:--",
+  "Whether 'tis nobler in the mind to suffer",
+  "The slings and arrows of outrageous fortune",
+  "Or to take arms against a sea of troubles,")
+Scala-Flink> val counts = text
+    .flatMap { _.toLowerCase.split("\\W+") }
+    .map { (_, 1) }.groupBy(0).sum(1)
+Scala-Flink> counts.print()
+~~~
+
+The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
+
+It is possible to write results to a file. However, in this case you need to call `execute`, to run your program:
+
+~~~scala
+Scala-Flink> benv.execute("MyProgram")
+~~~
+
+### DataStream API
+
+Similar to the the batch program above, we can execute a streaming program through the DataStream API:
+
+~~~scala
+Scala-Flink> val textStreaming = senv.fromElements(
+  "To be, or not to be,--that is the question:--",
+  "Whether 'tis nobler in the mind to suffer",
+  "The slings and arrows of outrageous fortune",
+  "Or to take arms against a sea of troubles,")
+Scala-Flink> val countsStreaming = textStreaming
+    .flatMap { _.toLowerCase.split("\\W+") }
+    .map { (_, 1) }.keyBy(0).sum(1)
+Scala-Flink> countsStreaming.print()
+Scala-Flink> senv.execute("Streaming Wordcount")
+~~~
+
+Note, that in the Streaming case, the print operation does not trigger execution directly.
+
+The Flink Shell comes with command history and auto-completion.
+
+
+## Adding external dependencies
+
+It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
+
+Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
+
+~~~bash
+bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
+~~~
+
+
+## Setup
+
+To get an overview of what options the Scala Shell provides, please use
+
+~~~bash
+bin/start-scala-shell.sh --help
+~~~
+
+### Local
+
+To use the shell with an integrated Flink cluster just execute:
+
+~~~bash
+bin/start-scala-shell.sh local
+~~~
+
+
+### Remote
+
+To use it with a running cluster start the scala shell with the keyword `remote`
+and supply the host and port of the JobManager with:
+
+~~~bash
+bin/start-scala-shell.sh remote <hostname> <portnumber>
+~~~
+
+### Yarn Scala Shell cluster
+
+The shell can deploy a Flink cluster to YARN, which is used exclusively by the
+shell. The number of YARN containers can be controlled by the parameter `-n <arg>`.
+The shell deploys a new Flink cluster on YARN and connects the
+cluster. You can also specify options for YARN cluster such as memory for
+JobManager, name of YARN application, etc.
+
+For example, to start a Yarn cluster for the Scala Shell with two TaskManagers
+use the following:
+
+~~~bash
+ bin/start-scala-shell.sh yarn -n 2
+~~~
+
+For all other options, see the full reference at the bottom.
+
+
+### Yarn Session
+
+If you have previously deployed a Flink cluster using the Flink Yarn Session,
+the Scala shell can connect with it using the following command:
+
+~~~bash
+ bin/start-scala-shell.sh yarn
+~~~
+
+
+## Full Reference
+
+~~~bash
+Flink Scala Shell
+Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
+
+Command: local [options]
+Starts Flink scala shell with a local Flink cluster
+  -a <path/to/jar> | --addclasspath <path/to/jar>
+        Specifies additional jars to be used in Flink
+Command: remote [options] <host> <port>
+Starts Flink scala shell connecting to a remote cluster
+  <host>
+        Remote host name as string
+  <port>
+        Remote port as integer
+
+  -a <path/to/jar> | --addclasspath <path/to/jar>
+        Specifies additional jars to be used in Flink
+Command: yarn [options]
+Starts Flink scala shell connecting to a yarn cluster
+  -n arg | --container arg
+        Number of YARN container to allocate (= Number of TaskManagers)
+  -jm arg | --jobManagerMemory arg
+        Memory for JobManager container [in MB]
+  -nm <value> | --name <value>
+        Set a custom name for the application on YARN
+  -qu <arg> | --queue <arg>
+        Specifies YARN queue
+  -s <arg> | --slots <arg>
+        Number of slots per TaskManager
+  -tm <arg> | --taskManagerMemory <arg>
+        Memory per TaskManager container [in MB]
+  -a <path/to/jar> | --addclasspath <path/to/jar>
+        Specifies additional jars to be used in Flink
+  --configDir <value>
+        The configuration directory.
+  -h | --help
+        Prints this usage text
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
new file mode 100644
index 0000000..ec8c5eb
--- /dev/null
+++ b/docs/dev/state.md
@@ -0,0 +1,293 @@
+---
+title: "Working with State"
+nav-parent_id: dev
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+All transformations in Flink may look like functions (in the functional processing terminology), but
+are in fact stateful operators. You can make *every* transformation (`map`, `filter`, etc) stateful
+by using Flink's state interface or checkpointing instance fields of your function. You can register
+any instance field
+as ***managed*** state by implementing an interface. In this case, and also in the case of using
+Flink's native state interface, Flink will automatically take consistent snapshots of your state
+periodically, and restore its value in the case of a failure.
+
+The end effect is that updates to any form of state are the same under failure-free execution and
+execution under failures.
+
+First, we look at how to make instance fields consistent under failures, and then we look at
+Flink's state interface.
+
+By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large
+state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system),
+which can be configured in the `flink-conf.yaml` or via `StreamExecutionEnvironment.setStateBackend(…)`.
+See [state backends]({{ site.baseurl }}/dev/state_backends.html) for information
+about the available state backends and how to configure them.
+
+* ToC
+{:toc}
+
+## Using the Key/Value State Interface
+
+The Key/Value state interface provides access to different types of state that are all scoped to
+the key of the current input element. This means that this type of state can only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+
+Now, we will first look at the different types of state available and then we will see
+how they can be used in a program. The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value
+for each key that the operation sees). The value can be set using `update(T)` and retrieved using
+`T value()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
+over all currently stored elements. Elements are added using `add(T)`, the Iterable can
+be retrieved using `Iterable<T> get()`.
+
+* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
+added to the state. The interface is the same as for `ListState` but elements added using
+`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+All types of state also have a method `clear()` that clears the state for the currently
+active key (i.e. the key of the input element).
+
+It is important to keep in mind that these state objects are only used for interfacing
+with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depend on the key of the input element. So the value you get in one invocation of your
+user function can be different from the one you get in another invocation if the key of
+the element is different.
+
+To get a state handle you have to create a `StateDescriptor` this holds the name of the state
+(as we will later see you can create several states, and they have to have unique names so
+that you can reference them), the type of the values that the state holds and possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
+want to retrieve you create one of `ValueStateDescriptor`, `ListStateDescriptor` or
+`ReducingStateDescriptor`.
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
+Please see [here]({{ site.baseurl }}/apis/common/#specifying-transformation-functions) for
+information about that but we will also see an example shortly. The `RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit together:
+
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
+
+        // access the state value
+        Tuple2<Long, Long> currentSum = sum.value();
+
+        // update the count
+        currentSum.f0 += 1;
+
+        // add the second field of the input value
+        currentSum.f1 += input.f1;
+
+        // update the state
+        sum.update(currentSum);
+
+        // if the count reaches 2, emit the average and clear the state
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+
+// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
+        .keyBy(0)
+        .flatMap(new CountWindowAverage())
+        .print();
+
+// the printed output will be (1,4) and (1,5)
+{% endhighlight %}
+
+This example implements a poor man's counting window. We key the tuples by the first field
+(in the example all have the same key `1`). The function stores the count and a running sum in
+a `ValueState`, once the count reaches 2 it will emit the average and clear the state so that
+we start over from `0`. Note that this would keep a different state value for each different input
+key if we had tuples with different values in the first field.
+
+### State in the Scala DataStream API
+
+In addition to the interface described above, the Scala API has shortcuts for stateful
+`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function
+gets the current value of the `ValueState` in an `Option` and must return an updated value that
+will be used to update the state.
+
+{% highlight scala %}
+val stream: DataStream[(String, Int)] = ...
+
+val counts: DataStream[(String, Int)] = stream
+  .keyBy(_._1)
+  .mapWithState((in: (String, Int), count: Option[Int]) =>
+    count match {
+      case Some(c) => ( (in._1, c), Some(c + in._2) )
+      case None => ( (in._1, 0), Some(in._2) )
+    })
+{% endhighlight %}
+
+## Checkpointing Instance Fields
+
+Instance fields can be checkpointed by using the `Checkpointed` interface.
+
+When the user-defined function implements the `Checkpointed` interface, the `snapshotState(…)` and `restoreState(…)`
+methods will be executed to draw and restore function state.
+
+In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on
+completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
+Note that there is no guarantee for the user function to receive a notification if a failure happens between
+checkpoint completion and notification. The notifications should hence be treated in a way that notifications from
+later checkpoints can subsume missing notifications.
+
+The above example for `ValueState` can be implemented using instance fields like this:
+
+{% highlight java %}
+
+public class CountWindowAverage
+        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+        implements Checkpointed<Tuple2<Long, Long>> {
+
+    private Tuple2<Long, Long> sum = null;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
+
+        // update the count
+        sum.f0 += 1;
+
+        // add the second field of the input value
+        sum.f1 += input.f1;
+
+
+        // if the count reaches 2, emit the average and clear the state
+        if (sum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
+            sum = Tuple2.of(0L, 0L);
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        if (sum == null) {
+            // only recreate if null
+            // restoreState will be called before open()
+            // so this will already set the sum to the restored value
+            sum = Tuple2.of(0L, 0L);
+        }
+    }
+
+    // regularly persists state during normal operation
+    @Override
+    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
+        return sum;
+    }
+
+    // restores state on recovery from failure
+    @Override
+    public void restoreState(Tuple2<Long, Long> state) {
+        sum = state;
+    }
+}
+{% endhighlight %}
+
+## Stateful Source Functions
+
+Stateful sources require a bit more care as opposed to other operators.
+In order to make the updates to the state and output collection atomic (required for exactly-once semantics
+on failure/recovery), the user is required to get a lock from the source's context.
+
+{% highlight java %}
+public static class CounterSource
+        extends RichParallelSourceFunction<Long>
+        implements Checkpointed<Long> {
+
+    /**  current offset for exactly once semantics */
+    private long offset;
+
+    /** flag for job cancellation */
+    private volatile boolean isRunning = true;
+
+    @Override
+    public void run(SourceContext<Long> ctx) {
+        final Object lock = ctx.getCheckpointLock();
+
+        while (isRunning) {
+            // output and state update are atomic
+            synchronized (lock) {
+                ctx.collect(offset);
+                offset += 1;
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+
+    @Override
+    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+        return offset;
+
+    }
+
+    @Override
+	public void restoreState(Long state) {
+        offset = state;
+    }
+}
+{% endhighlight %}
+
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+
+## State Checkpoints in Iterative Jobs
+
+Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.
+
+Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/dev/state_backends.md b/docs/dev/state_backends.md
new file mode 100644
index 0000000..e5b9c2a
--- /dev/null
+++ b/docs/dev/state_backends.md
@@ -0,0 +1,162 @@
+---
+title: "State Backends"
+nav-parent_id: dev
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Programs written in the [Data Stream API]({{ site.baseurl }}/dev/datastream_api.html) often hold state in various forms:
+
+- Windows gather elements or aggregates until they are triggered
+- Transformation functions may use the key/value state interface to store values
+- Transformation functions may implement the `Checkpointed` interface to make their local variables fault tolerant
+
+See also [Working with State]({{ site.baseurl }}/dev/state.html) in the streaming API guide.
+
+When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently.
+How the state is represented internally, and how and where it is persisted upon checkpoints depends on the
+chosen **State Backend**.
+
+* ToC
+{:toc}
+
+## Available State Backends
+
+Out of the box, Flink bundles these state backends:
+
+ - *MemoryStateBacked*
+ - *FsStateBackend*
+ - *RocksDBStateBackend*
+
+If nothing else is configured, the system will use the MemoryStateBacked.
+
+
+### The MemoryStateBackend
+
+The *MemoryStateBacked* holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the
+JobManager (master), which stores it on its heap as well.
+
+Limitations of the MemoryStateBackend:
+
+  - The size of each individual state is by default limited to 5 MB. This value can be increased in the constructor of the MemoryStateBackend.
+  - Irrespective of the configured maximal state size, the state cannot be larger than the akka frame size (see [Configuration]({{ site.baseurl }}/setup/config.html)).
+  - The aggregate state must fit into the JobManager memory.
+
+The MemoryStateBackend is encouraged for:
+
+  - Local development and debugging
+  - Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer requires very little state.
+
+
+### The FsStateBackend
+
+The *FsStateBackend* is configured with a file system URL (type, address, path), such as "hdfs://namenode:40010/flink/checkpoints" or "file:///data/flink/checkpoints".
+
+The FsStateBackend holds in-flight data in the TaskManager's memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager's memory (or, in high-availability mode, in the metadata checkpoint).
+
+The FsStateBackend is encouraged for:
+
+  - Jobs with large state, long windows, large key/value states.
+  - All high-availability setups.
+
+### The RocksDBStateBackend
+
+The *RocksDBStateBackend* is configured with a file system URL (type, address, path), such as "hdfs://namenode:40010/flink/checkpoints" or "file:///data/flink/checkpoints".
+
+The RocksDBStateBackend holds in-flight data in a [RocksDB](http://rocksdb.org) data base
+that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole
+RocksDB data base will be checkpointed into the configured file system and directory. Minimal
+metadata is stored in the JobManager's memory (or, in high-availability mode, in the metadata checkpoint).
+
+The RocksDBStateBackend is encouraged for:
+
+  - Jobs with very large state, long windows, large key/value states.
+  - All high-availability setups.
+
+Note that the amount of state that you can keep is only limited by the amount of disc space available.
+This allows keeping very large state, compared to the FsStateBackend that keeps state in memory.
+This also means, however, that the maximum throughput that can be achieved will be lower with
+this state backend.
+
+**NOTE:** To use the RocksDBStateBackend you also have to add the correct maven dependency to your
+project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-statebackend-rocksdb{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+The backend is currently not part of the binary distribution. See
+[here]({{ site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for an explanation of how to include it for cluster execution.
+
+## Configuring a State Backend
+
+State backends can be configured per job. In addition, you can define a default state backend to be used when the
+job does not explicitly define a state backend.
+
+
+### Setting the Per-job State Backend
+
+The per-job state backend is set on the `StreamExecutionEnvironment` of the job, as shown in the example below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
+{% endhighlight %}
+</div>
+</div>
+
+
+### Setting Default State Backend
+
+A default state backend can be configured in the `flink-conf.yaml`, using the configuration key `state.backend`.
+
+Possible values for the config entry are *jobmanager* (MemoryStateBackend), *filesystem* (FsStateBackend), or the fully qualified class
+name of the class that implements the state backend factory [FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java).
+
+In the case where the default state backend is set to *filesystem*, the entry `state.backend.fs.checkpointdir` defines the directory where the checkpoint data will be stored.
+
+A sample section in the configuration file could look as follows:
+
+~~~
+# The backend that will be used to store operator state checkpoints
+
+state.backend: filesystem
+
+
+# Directory for storing checkpoints
+
+state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
+~~~


Mime
View raw message