flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [07/15] flink git commit: [FLINK-3132] [docs] Initial docs restructure
Date Fri, 15 Jan 2016 15:49:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/python.md b/docs/apis/python.md
deleted file mode 100644
index d57e117..0000000
--- a/docs/apis/python.md
+++ /dev/null
@@ -1,606 +0,0 @@
----
-title: "Python Programming Guide"
-is_beta: true
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<a href="#top"></a>
-
-Analysis programs in Flink are regular programs that implement transformations on data sets
-(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
-sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for
-example write the data to (distributed) files, or to standard output (for example the command line
-terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
-The execution can happen in a local JVM, or on clusters of many machines.
-
-In order to create your own Flink program, we encourage you to start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations). The remaining sections act as references for additional
-operations and advanced features.
-
-* This will be replaced by the TOC
-{:toc}
-
-Example Program
----------------
-
-The following program is a complete, working example of WordCount. You can copy &amp; paste the code
-to run it locally.
-
-{% highlight python %}
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, STRING
-from flink.functions.GroupReduceFunction import GroupReduceFunction
-
-class Adder(GroupReduceFunction):
-  def reduce(self, iterator, collector):
-    count, word = iterator.next()
-    count += sum([x[0] for x in iterator])
-    collector.collect((count, word))
-
-if __name__ == "__main__":
-  env = get_environment()
-  data = env.from_elements("Who's there?",
-   "I think I hear them. Stand, ho! Who's there?")
-  
-  data \
-    .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \
-    .group_by(1) \
-    .reduce_group(Adder(), (INT, STRING), combinable=True) \
-    .output()
-  
-  env.execute(local=True)
-{% endhighlight %}
-
-[Back to top](#top)
-
-Program Skeleton
-----------------
-
-As we already saw in the example, Flink programs look like regular python
-programs with a `if __name__ == "__main__":` block. Each program consists of the same basic parts:
-
-1. Obtain an `Environment`,
-2. Load/create the initial data,
-3. Specify transformations on this data,
-4. Specify where to put the results of your computations, and
-5. Execute your program.
-
-We will now give an overview of each of those steps but please refer to the respective sections for
-more details. 
-
-
-The `Environment` is the basis for all Flink programs. You can
-obtain one using these static methods on class `Environment`:
-
-{% highlight python %}
-get_environment()
-{% endhighlight %}
-
-For specifying data sources the execution environment has several methods
-to read from files. To just read a text file as a sequence of lines, you can use:
-
-{% highlight python %}
-env = get_environment()
-text = env.read_text("file:///path/to/file")
-{% endhighlight %}
-
-This will give you a DataSet on which you can then apply transformations. For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataSet you can apply transformations to create a new
-DataSet which you can then write to a file, transform again, or
-combine with other DataSets. You apply transformations by calling
-methods on DataSet with your own custom transformation function. For example,
-a map transformation looks like this:
-
-{% highlight python %}
-data.map(lambda x: x*2, INT)
-{% endhighlight %}
-
-This will create a new DataSet by doubling every value in the original DataSet. 
-For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataSet that needs to be written to disk you can call one
-of these methods on DataSet:
-
-{% highlight python %}
-data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
-write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
-output()
-{% endhighlight %}
-
-The last method is only useful for developing/debugging on a local machine,
-it will output the contents of the DataSet to standard output. (Note that in
-a cluster, the result goes to the standard out stream of the cluster nodes and ends
-up in the *.out* files of the workers).
-The first two do as the name suggests. 
-Please refer to [Data Sinks](#data-sinks) for more information on writing to files.
-
-Once you specified the complete program you need to call `execute` on
-the `Environment`. This will either execute on your local machine or submit your program 
-for execution on a cluster, depending on how Flink was started. You can force
-a local execution by using `execute(local=True)`.
-
-[Back to top](#top)
-
-Project setup
----------------
-
-Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.
-
-The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed.
-
-[Back to top](#top)
-
-Lazy Evaluation
----------------
-
-All Flink programs are executed lazily: When the program's main method is executed, the data loading
-and transformations do not happen directly. Rather, each operation is created and added to the
-program's plan. The operations are actually executed when one of the `execute()` methods is invoked
-on the Environment object. Whether the program is executed locally or on a cluster depends
-on the environment of the program.
-
-The lazy evaluation lets you construct sophisticated programs that Flink executes as one
-holistically planned unit.
-
-[Back to top](#top)
-
-
-Transformations
----------------
-
-Data transformations transform one or more DataSets into a new DataSet. Programs can combine
-multiple transformations into sophisticated assemblies.
-
-This section gives a brief overview of the available transformations. The [transformations
-documentation](dataset_transformations.html) has a full description of all transformations with
-examples.
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <td><strong>Map</strong></td>
-      <td>
-        <p>Takes one element and produces one element.</p>
-{% highlight python %}
-data.map(lambda x: x * 2, INT)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>FlatMap</strong></td>
-      <td>
-        <p>Takes one element and produces zero, one, or more elements. </p>
-{% highlight python %}
-data.flat_map(
-  lambda x,c: [(1,word) for word in line.lower().split() for line in x],
-  (INT, STRING))
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>MapPartition</strong></td>
-      <td>
-        <p>Transforms a parallel partition in a single function call. The function get the partition
-        as an `Iterator` and can produce an arbitrary number of result values. The number of
-        elements in each partition depends on the degree-of-parallelism and previous operations.</p>
-{% highlight python %}
-data.map_partition(lambda x,c: [value * 2 for value in x], INT)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Filter</strong></td>
-      <td>
-        <p>Evaluates a boolean function for each element and retains those for which the function
-        returns true.</p>
-{% highlight python %}
-data.filter(lambda x: x > 1000)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Reduce</strong></td>
-      <td>
-        <p>Combines a group of elements into a single element by repeatedly combining two elements
-        into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
-{% highlight python %}
-data.reduce(lambda x,y : x + y)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>ReduceGroup</strong></td>
-      <td>
-        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
-        full data set, or on a grouped data set.</p>
-{% highlight python %}
-class Adder(GroupReduceFunction):
-  def reduce(self, iterator, collector):
-    count, word = iterator.next()
-    count += sum([x[0] for x in iterator)      
-    collector.collect((count, word))
-
-data.reduce_group(Adder(), (INT, STRING))
-{% endhighlight %}
-      </td>
-    </tr>
-
-    </tr>
-      <td><strong>Join</strong></td>
-      <td>
-        Joins two data sets by creating all pairs of elements that are equal on their keys.
-        Optionally uses a JoinFunction to turn the pair of elements into a single element. 
-        See <a href="#specifying-keys">keys</a> on how to define join keys.
-{% highlight python %}
-# In this case tuple fields are used as keys. 
-# "0" is the join field on the first tuple
-# "1" is the join field on the second tuple.
-result = input1.join(input2).where(0).equal_to(1)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>CoGroup</strong></td>
-      <td>
-        <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
-        fields and then joins the groups. The transformation function is called per pair of groups.
-        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
-{% highlight python %}
-data1.co_group(data2).where(0).equal_to(1)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Cross</strong></td>
-      <td>
-        <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
-        elements. Optionally uses a CrossFunction to turn the pair of elements into a single
-        element.</p>
-{% highlight python %}
-result = data1.cross(data2)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>Union</strong></td>
-      <td>
-        <p>Produces the union of two data sets.</p>
-{% highlight python %}
-data.union(data2)
-{% endhighlight %}
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-[Back to Top](#top)
-
-
-Specifying Keys
--------------
-
-Some transformations (like Join or CoGroup) require that a key is defined on
-its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are
-applied.
-
-A DataSet is grouped as
-{% highlight python %}
-reduced = data \
-  .group_by(<define key here>) \
-  .reduce_group(<do something>)
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data set types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-### Define keys for Tuples
-{:.no_toc}
-
-The simplest case is grouping a data set of Tuples on one or more
-fields of the Tuple:
-{% highlight python %}
-reduced = data \
-  .group_by(0) \
-  .reduce_group(<do something>)
-{% endhighlight %}
-
-The data set is grouped on the first field of the tuples. 
-The group-reduce function will thus receive groups of tuples with
-the same value in the first field.
-
-{% highlight python %}
-grouped = data \
-  .group_by(0,1) \
-  .reduce(/*do something*/)
-{% endhighlight %}
-
-The data set is grouped on the composite key consisting of the first and the
-second fields, therefore the reduce function will receive groups
-with the same value for both fields.
-
-A note on nested Tuples: If you have a DataSet with a nested tuple
-specifying `group_by(<index of tuple>)` will cause the system to use the full tuple as a key.
-
-[Back to top](#top)
-
-
-Passing Functions to Flink
---------------------------
-
-Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.
-
-{% highlight python %}
-data.filter(lambda x: x > 5)
-{% endhighlight %}
-
-{% highlight python %}
-class Filter(FilterFunction):
-    def filter(self, value):
-        return value > 5
-
-data.filter(Filter())
-{% endhighlight %}
-
-Rich functions allow the use of imported functions, provide access to broadcast-variables, 
-can be parameterized using __init__(), and are the go-to-option for complex functions.
-They are also the only way to define an optional `combine` function for a reduce operation.
-
-Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return
-an iterable, if the operation can return multiple values. (All functions receiving a collector argument)
-
-Flink requires type information at the time when it prepares the program for execution 
-(when the main method of the program is called). This is done by passing an exemplary 
-object that has the desired type. This holds also for tuples.
-
-{% highlight python %}
-(INT, STRING)
-{% endhighlight %}
-
-Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.
-
-There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.
-
-[Back to top](#top)
-
-Data Types
-----------
-
-Flink's Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.
-
-#### Tuples/Lists
-
-You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain 
-a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.
-
-{% highlight python %}
-word_counts = env.from_elements(("hello", 1), ("world",2))
-
-counts = word_counts.map(lambda x: x[1], INT)
-{% endhighlight %}
-
-When working with operators that require a Key for grouping or matching records,
-Tuples let you simply specify the positions of the fields to be used as key. You can specify more
-than one position to use composite keys (see [Section Data Transformations](#transformations)).
-
-{% highlight python %}
-wordCounts \
-    .group_by(0) \
-    .reduce(MyReduceFunction())
-{% endhighlight %}
-
-[Back to top](#top)
-
-Data Sources
-------------
-
-Data sources create the initial data sets, such as from files or from collections.
-
-File-based:
-
-- `read_text(path)` - Reads files line wise and returns them as Strings.
-- `read_csv(path, type)` - Parses files of comma (or another char) delimited fields.
-  Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
-  types.
-
-Collection-based:
-
-- `from_elements(*args)` - Creates a data set from a Seq. All elements
-
-**Examples**
-
-{% highlight python %}
-env  = get_environment
-
-# read text file from local files system
-localLiens = env.read_text("file:#/path/to/my/textfile")
-
- read text file from a HDFS running at nnHost:nnPort
-hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
-
- read a CSV file with three fields
-csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
-
- create a set from some given elements
-values = env.from_elements("Foo", "bar", "foobar", "fubar")
-{% endhighlight %}
-
-[Back to top](#top)
-
-Data Sinks
-----------
-
-Data sinks consume DataSets and are used to store or return them:
-
-- `write_text()` - Writes elements line-wise as Strings. The Strings are
-  obtained by calling the *str()* method of each element.
-- `write_csv(...)` - Writes tuples as comma-separated value files. Row and field
-  delimiters are configurable. The value for each field comes from the *str()* method of the objects.
-- `output()` - Prints the *str()* value of each element on the
-  standard out.
-
-A DataSet can be input to multiple operations. Programs can write or print a data set and at the
-same time run additional transformations on them.
-
-**Examples**
-
-Standard data sink methods:
-
-{% highlight scala %}
- write DataSet to a file on the local file system
-textData.write_text("file:///my/result/on/localFS")
-
- write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
-textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
-
- write DataSet to a file and overwrite the file if it exists
-textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
-
- tuples as lines with pipe as the separator "a|b|c"
-values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
-
- this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
-values.write_text("file:///path/to/the/result/file")
-{% endhighlight %}
-
-[Back to top](#top)
-
-Broadcast Variables
--------------------
-
-Broadcast variables allow you to make a data set available to all parallel instances of an
-operation, in addition to the regular input of the operation. This is useful for auxiliary data
-sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
-Collection.
-
-- **Broadcast**: broadcast sets are registered by name via `with_broadcast_set(DataSet, String)`
-- **Access**: accessible via `self.context.get_broadcast_variable(String)` at the target operator
-
-{% highlight python %}
-class MapperBcv(MapFunction):
-    def map(self, value):
-        factor = self.context.get_broadcast_variable("bcv")[0][0]
-        return value * factor
-
-# 1. The DataSet to be broadcasted
-toBroadcast = env.from_elements(1, 2, 3) 
-data = env.from_elements("a", "b")
-
-# 2. Broadcast the DataSet
-data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) 
-{% endhighlight %}
-
-Make sure that the names (`bcv` in the previous example) match when registering and
-accessing broadcasted data sets.
-
-**Note**: As the content of broadcast variables is kept in-memory on each node, it should not become
-too large. For simpler things like scalar values you can simply parameterize the rich function.
-
-[Back to top](#top)
-
-Parallel Execution
-------------------
-
-This section describes how the parallel execution of programs can be configured in Flink. A Flink
-program consists of multiple tasks (operators, data sources, and sinks). A task is split into
-several parallel instances for execution and each parallel instance processes a subset of the task's
-input data. The number of parallel instances of a task is called its *parallelism* or *degree of
-parallelism (DOP)*.
-
-The degree of parallelism of a task can be specified in Flink on different levels.
-
-### Execution Environment Level
-
-Flink programs are executed in the context of an [execution environmentt](#program-skeleton). An
-execution environment defines a default parallelism for all operators, data sources, and data sinks
-it executes. Execution environment parallelism can be overwritten by explicitly configuring the
-parallelism of an operator.
-
-The default parallelism of an execution environment can be specified by calling the
-`set_degree_of_parallelism()` method. To execute all operators, data sources, and data sinks of the
-[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the
-execution environment as follows:
-
-{% highlight python %}
-env = get_environment()
-env.set_degree_of_parallelism(3)
-
-text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \
-    .group_by(1) \
-    .reduce_group(Adder(), (INT, STRING), combinable=True) \
-    .output()
-
-env.execute()
-{% endhighlight %}
-
-### System Level
-
-A system-wide default parallelism for all execution environments can be defined by setting the
-`parallelism.default` property in `./conf/flink-conf.yaml`. See the
-[Configuration](config.html) documentation for details.
-
-[Back to top](#top)
-
-Executing Plans
----------------
-
-To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. 
-use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed 
-as the first argument, followed by a number of additional python packages, and finally, separated by - additional 
-arguments that will be fed to the script. 
-
-{% highlight python %}
-./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
-{% endhighlight %}
-
-[Back to top](#top)
-
-Debugging
----------------
-
-If you are running Flink programs locally, you can debug your program following this guide.
-First you have to enable debugging by setting the debug switch in the `env.execute(debug=True)` call. After
-submitting your program, open the jobmanager log file, and look for a line that says 
-`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port>` Now open `/tmp/flink` in your python
-IDE and run the `executor.py <port>`.
-
-[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/apis/savepoints.md b/docs/apis/savepoints.md
deleted file mode 100644
index 80bd83d..0000000
--- a/docs/apis/savepoints.md
+++ /dev/null
@@ -1,108 +0,0 @@
----
-title: "Savepoints"
-is_beta: false
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
-
-* toc
-{:toc}
-
-## Overview
-
-Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
-
-Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed.
-
-<img src="fig/savepoints-overview.png" class="center" />
-
-In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been *discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> is special**. It is the state associated with the savepoint **s<sub>1</sub>** and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
-
-Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint and periodic checkpoint data is kept around.
-
-## Configuration
-
-Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend.
-
-### JobManager
-
-This is the **default backend** for savepoints.
-
-Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part of the [job manager's highly available]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state.
-
-<pre>
-savepoints.state.backend: jobmanager
-</pre>
-
-**Note**: If you don't configure a specific state backend for the savepoints, the jobmanager backend will be used.
-
-### File system
-
-Savepoints are stored in the configured **file system directory**. They are available between cluster instances and allow to move your program to another cluster.
-
-<pre>
-savepoints.state.backend: filesystem
-savepoints.state.backend.fs.dir: hdfs:///flink/savepoints
-</pre>
-
-**Note**: If you don't configure a specific directory, the job manager backend will be used.
-
-**Important**: A savepoint is a pointer to a completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files). Therefore, using the *filesystem* backend for savepoints and the *jobmanager* backend for checkpoints does not work, because the required checkpoint data won't be available after a job manager restart.
-
-## Changes to your program
-
-Savepoints **work out of the box**, but it is **highly recommended** that you slightly adjust your programs in order to be able to work with savepoints in future versions of your program.
-
-<img src="fig/savepoints-program_ids.png" class="center" />
-
-For savepoints **only stateful tasks matter**. In the above example, the source and map tasks are stateful whereas the sink is not stateful. Therefore, only the state of the source and map tasks are part of the savepoint.
-
-Each task is identified by its **generated task IDs** and **subtask index**. In the above example the state of the source (**s<sub>1</sub>**, **s<sub>2</sub>**) and map tasks (**m<sub>1</sub>**, **m<sub>2</sub>**) is identified by their respective task ID (*0xC322EC* for the source tasks and *0x27B3EF* for the map tasks) and subtask index. There is no state for the sinks (**t<sub>1</sub>**, **t<sub>2</sub>**). Their IDs therefore do not matter.
-
-<span class="label label-danger">Important</span> The IDs are generated **deterministically** from your program structure. This means that as long as your program does not change, the IDs do not change. **The only allowed changes are within the user function, e.g. you can change the implemented `MapFunction` without changing the typology**. In this case, it is straight forward to restore the state from a savepoint by mapping it back to the same task IDs and subtask indexes. This allows you to work with savepoints out of the box, but gets problematic as soon as you make changes to the topology, because they result in changed IDs and the savepoint state cannot be mapped to your program any more.
-
-<span class="label label-info">Recommended</span> In order to be able to change your program and **have fixed IDs**, the *DataStream* API provides a method to manually specify the task IDs. Each operator provides a **`uid(String)`** method to override the generated ID. The ID is a String, which will be deterministically hashed to a 16-byte hash value. It is **important** that the specified IDs are **unique per transformation and job**. If this is not the case, job submission will fail.
-
-{% highlight scala %}
-DataStream<String> stream = env.
-  // Stateful source (e.g. Kafka) with ID
-  .addSource(new StatefulSource())
-  .uid("source-id")
-  .shuffle()
-  // The stateful mapper with ID
-  .map(new StatefulMapper())
-  .uid("mapper-id")
-
-// Stateless sink (no specific ID required)
-stream.print()
-{% endhighlight %}
-
-## Command-line client
-
-You control the savepoints via the [command line client]({{site.baseurl}}/apis/cli.html#savepoints).
-
-## Current limitations
-
-- **Parallelism**: When restoring a savepoint, the parallelism of the program has to match the parallelism of the original program from which the savepoint was drawn. There is no mechanism to re-partition the savepoint's state yet.
-
-- **Chaining**: Chained operators are identified by the ID of the first task. It's not possible to manually assign an ID to an intermediate chained task, e.g. in the chain `[  a -> b -> c ]` only **a** can have its ID assigned manually, but not **b** or **c**. To work around this, you can [manually define the task chains](streaming_guide.html#task-chaining-and-resource-groups). If you rely on the automatic ID assignment, a change in the chaining behaviour will also change the IDs.
-
-- **Disposing custom state handles**: Disposing an old savepoint does not work with custom state handles (if you are using a custom state backend), because the user code class loader is not available during disposal.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md
index a0c10c1..2296dca 100644
--- a/docs/apis/scala_shell.md
+++ b/docs/apis/scala_shell.md
@@ -1,5 +1,8 @@
 ---
-title: "Interactive Scala Shell"
+title: "Scala Shell"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 10
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -25,7 +28,7 @@ Flink comes with an integrated interactive Scala Shell.
 It can be used in a local setup as well as in a cluster setup. To get started with downloading
 Flink and setting up a cluster please refer to
 [local setup]({{ site.baseurl }}/setup/local_setup.html) or
-[cluster setup]({{ site.baseurl }}/setup/cluster.html) 
+[cluster setup]({{ site.baseurl }}/setup/cluster.html)
 
 To use the shell with an integrated Flink cluster just execute:
 
@@ -78,4 +81,3 @@ Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` t
 ~~~bash
 bin/start-scala-shell.sh [local | remote <host> <port>] --addclasspath <path/to/jar.jar>
 ~~~
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/apis/state_backends.md b/docs/apis/state_backends.md
deleted file mode 100644
index ad191f9..0000000
--- a/docs/apis/state_backends.md
+++ /dev/null
@@ -1,121 +0,0 @@
----
-title:  "State Backends"
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) often hold state in various forms:
-
-- Windows gather elements or aggregates until they are triggered
-- Transformation functions may use the key/value state interface to store values
-- Transformation functions may implement the `Checkpointed` interface to make their local variables fault tolerant
-
-See also [Working with State]({{ site.baseurl }}/apis/streaming_guide.html#working_with_state) in the streaming API guide.
-
-When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently.
-How the state is represented internally, and how and where it is persisted upon checkpoints depends on the
-chosen **State Backend**.
-
-
-## Available State Backends
-
-Out of the box, Flink bundles two state backends: *MemoryStateBacked* and *FsStateBackend*. If nothing else is configured,
-the system will use the MemoryStateBacked.
-
-
-### The MemoryStateBackend
-
-The *MemoryStateBacked* holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables
-that store the values, triggers, etc.
-
-Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the
-JobManager (master), which stores it on its heap as well.
-
-Limitations of the MemoryStateBackend:
-
-  - The size of each individual state is by default limited to 5 MB. This value can be increased in the constructor of the MemoryStateBackend.
-  - Irrespective of the configured maximal state size, the state cannot be larger than the akka frame size (see [Configuration]({{ site.baseurl }}/setup/config.html)).
-  - The aggregate state must fit into the JobManager memory.
-
-The MemoryStateBackend is encouraged for:
-
-  - Local development and debugging
-  - Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer requires very little state.
-
-
-### The FsStateBackend
-
-The *FsStateBackend* (FileSystemStateBackend) is configured with a file system URL (type, address, path), such as for example "hdfs://namenode:40010/flink/checkpoints" or "file:///data/flink/checkpoints". 
-
-The FsStateBackend holds in-flight data in the TaskManager's memory. Upon checkpoints, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager's memory (or, in high-availability mode, in the metadata checkpoint).
-
-The FsStateBackend is encouraged for:
-
-  - Jobs with large state, long windows, large key/value states.
-  - All high-availability setups.
-
-
-## Configuring a State Backend
-
-State backends can be configured per job. In addition, you can define a default state backend to be used when the
-job does not explicitly define a state backend.
-
-
-### Setting the Per-job State Backend
-
-The per-job state backend is set on the `StreamExecutionEnvironment` of the job, as shown in the example below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
-{% endhighlight %}
-</div>
-</div>
-
-
-### Setting Default State Backend
-
-A default state backend can be configured in the `flink-conf.yaml`, using the configuration key `state.backend`.
-
-Possible values for the config entry are *jobmanager* (MemoryStateBackend), *filesystem* (FsStateBackend), or the fully qualified class
-name of the class that implements the state backend factory [FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java).
-
-In the case where the default state backend is set to *filesystem*, the entry `state.backend.fs.checkpointdir` defines the directory where the checkpoint data will be stored.
-
-A sample section in the configuration file could look as follows:
-
-~~~
-# The backend that will be used to store operator state checkpoints
-
-state.backend: filesystem
-
-
-# Directory for storing checkpoints
-
-state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
-~~~
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
deleted file mode 100644
index 852bbef..0000000
--- a/docs/apis/storm_compatibility.md
+++ /dev/null
@@ -1,285 +0,0 @@
----
-title: "Storm Compatibility"
-is_beta: true
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-[Flink streaming](streaming_guide.html) is compatible with Apache Storm interfaces and therefore allows
-reusing code that was implemented for Storm.
-
-You can:
-
-- execute a whole Storm `Topology` in Flink.
-- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
-
-This document shows how to use existing Storm code with Flink.
-
-* This will be replaced by the TOC
-{:toc}
-
-# Project Configuration
-
-Support for Storm is contained in the `flink-storm` Maven module.
-The code resides in the `org.apache.flink.storm` package.
-
-Add the following dependency to your `pom.xml` if you want to execute Storm code in Flink.
-
-~~~xml
-<dependency>
-	<groupId>org.apache.flink</groupId>
-	<artifactId>flink-storm</artifactId>
-	<version>{{site.version}}</version>
-</dependency>
-~~~
-
-**Please note**: Do not add `storm-core` as a dependency. It is already included via `flink-storm`.
-
-**Please note**: `flink-storm` is not part of the provided binary Flink distribution.
-Thus, you need to include `flink-storm` classes (and their dependencies) in your program jar (also called ueber-jar or fat-jar) that is submitted to Flink's JobManager.
-See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how to package a jar correctly.
-
-If you want to avoid large ueber-jars, you can manually copy `storm-core-0.9.4.jar`, `json-simple-1.1.jar` and `flink-storm-{{site.version}}.jar` into Flink's `lib/` folder of each cluster node (*before* the cluster is started).
-For this case, it is sufficient to include only your own Spout and Bolt classes (and their internal dependencies) into the program jar.
-
-# Execute Storm Topologies
-
-Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:
-
-- `StormSubmitter` replaced by `FlinkSubmitter`
-- `NimbusClient` and `Client` replaced by `FlinkClient`
-- `LocalCluster` replaced by `FlinkLocalCluster`
-
-In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
-The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
-If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.  If a parameter is not specified, the value is taken from `flink-conf.yaml`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
-
-// actual topology assembling code and used Spouts/Bolts can be used as-is
-builder.setSpout("source", new FileSpout(inputFilePath));
-builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
-builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
-builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
-
-Config conf = new Config();
-if(runLocal) { // submit to test cluster
-	// replaces: LocalCluster cluster = new LocalCluster();
-	FlinkLocalCluster cluster = new FlinkLocalCluster();
-	cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
-} else { // submit to remote cluster
-	// optional
-	// conf.put(Config.NIMBUS_HOST, "remoteHost");
-	// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-	// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
-	FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
-}
-~~~
-</div>
-</div>
-
-# Embed Storm Operators in Flink Streaming Programs 
-
-As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
-The Storm compatibility layer offers a wrapper classes for each, namely `SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
-
-Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
-For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
-
-Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
-In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` can be used.
-
-## Embed Spouts
-
-In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
-The Spout object is handed to the constructor of `SpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
-The generic type declaration `OUT` specifies the type of the source output stream.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// stream has `raw` type (single field output streams only)
-DataStream<String> rawInput = env.addSource(
-	new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
-	TypeExtractor.getForClass(String.class)); // output type
-
-// process data stream
-[...]
-~~~
-</div>
-</div>
-
-If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures to terminate automatically by setting `numberOfInvocations` parameter in its constructor.
-This allows the Flink program to shut down automatically after all data is processed.
-Per default the program will run until it is [canceled](cli.html) manually.
-
-
-## Embed Bolts
-
-In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
-The Bolt object is handed to the constructor of `BoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
-The generic type declarations `IN` and `OUT` specify the type of the operator's input and output stream, respectively.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream<String> text = env.readTextFile(localFilePath);
-
-DataStream<Tuple2<String, Integer>> counts = text.transform(
-	"tokenizer", // operator name
-	TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
-	new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
-
-// do further processing
-[...]
-~~~
-</div>
-</div>
-
-### Named Attribute Access for Embedded Bolts
-
-Bolts can accesses input tuple fields via name (additionally to access via index).
-To use this feature with embedded Bolts, you need to have either a
-
- 1. [POJO](programming_guide.html#pojos) type input stream or
- 2. [Tuple](programming_guide.html#tuples-and-case-classes) type input stream and spedify the input schema (ie, name-to-index-mapping)
-
-For POJO input types, Flink accesses the fields via reflection.
-For this case, Flink expects either a corresponding public member variable or public getter method.
-For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).
-
-For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
-For this case, the constructor of `BoltWrapper` takes an additional argument: `new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))`.
-The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
-
-See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
-
-## Configuring Spouts and Bolts
-
-In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
-This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
-If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
-
-For embedded usage, Flink's configuration mechanism must be used.
-A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
-Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
-However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
-Thus, Flink additionally provides `StormConfig` class that can be used like a raw `Map` to provide full compatibility to Storm.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-StormConfig config = new StormConfig();
-// set config values
-[...]
-
-// set global Storm configuration
-env.getConfig().setGlobalJobParameters(config);
-
-// assemble program with embedded Spouts and/or Bolts
-[...]
-~~~
-</div>
-</div>
-
-## Multiple Output Streams
-
-Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
-If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
-
-For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
-Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
-Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-[...]
-
-// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
-DataStream<SplitStreamType<SomeType>> multiStream = ...
-
-SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
-
-// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
-DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs);
-DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs);
-
-// do further processing on s1 and s2
-[...]
-~~~
-</div>
-</div>
-
-See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java) for a full example.
-
-# Flink Extensions
-
-## Finite Spouts
-
-In Flink, streaming sources can be finite, ie, emit a finite number of records and stop after emitting the last record. However, Spouts usually emit infinite streams.
-The bridge between the two approaches is the `FiniteSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
-The user can create a finite Spout by implementing this interface instead of (or additionally to) `IRichSpout`, and implementing the `reachedEnd()` method in addition.
-In contrast to a `SpoutWrapper` that is configured to emit a finite number of tuples, `FiniteSpout` interface allows to implement more complex termination criteria.
-
-Although finite Spouts are not necessary to embed Spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
-
- * to achieve that a native Spout behaves the same way as a finite Flink source with minimal modifications
- * the user wants to process a stream only for some time; after that, the Spout can stop automatically
- * reading a file into a stream
- * for testing purposes
-
-An example of a finite Spout that emits records for 10 seconds only:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
-	[...] // implemente open(), nextTuple(), ...
-
-	private long starttime = System.currentTimeMillis();
-
-	public boolean reachedEnd() {
-		return System.currentTimeMillis() - starttime > 10000l;
-	}
-}
-~~~
-</div>
-</div>
-
-# Storm Compatibility Examples
-
-You can find more examples in Maven module `flink-storm-examples`.
-For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/README.md).
-To run the examples, you need to assemble a correct jar file.
-`flink-storm-examples-{{ site.version }}.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
-
-There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
-Compare `pom.xml` to see how both jars are built.
-Furthermore, there is one example for whole Storm topologies (`WordCount-StormTopology.jar`).
-
-You can run each of those examples via `bin/flink run <jarname>.jar`. The correct entry point class is contained in each jar's manifest file.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/elasticsearch.md b/docs/apis/streaming/connectors/elasticsearch.md
new file mode 100644
index 0000000..5c07838
--- /dev/null
+++ b/docs/apis/streaming/connectors/elasticsearch.md
@@ -0,0 +1,165 @@
+---
+title: "Elasticsearch Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 2
+sub-nav-title: Elasticsearch
+---
+
+This connector provides a Sink that can write to an
+[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-elasticsearch</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Installing Elasticsearch
+
+Instructions for setting up an Elasticsearch cluster can be found
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
+Make sure to set and remember a cluster name. This must be set when
+creating a Sink for writing to your cluster
+
+#### Elasticsearch Sink
+The connector provides a Sink that can send data to an Elasticsearch Index.
+
+The sink can use two different methods for communicating with Elasticsearch:
+
+1. An embedded Node
+2. The TransportClient
+
+See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
+for information about the differences between the two modes.
+
+This code shows how to create a sink that uses an embedded Node for
+communication:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = Maps.newHashMap();
+// This instructs the sink to emit after every element, otherwise they would be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
+    @Override
+    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+        Map<String, Object> json = new HashMap<>();
+        json.put("data", element);
+
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new util.HashMap[String, String]
+config.put("bulk.flush.max.actions", "1")
+config.put("cluster.name", "my-cluster-name")
+
+text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
+  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    println("SENDING: " + element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+Note how a Map of Strings is used to configure the Sink. The configuration keys
+are documented in the Elasticsearch documentation
+[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
+Especially important is the `cluster.name` parameter that must correspond to
+the name of your cluster.
+
+Internally, the sink uses a `BulkProcessor` to send index requests to the cluster.
+This will buffer elements before sending a request to the cluster. The behaviour of the
+`BulkProcessor` can be configured using these config keys:
+ * **bulk.flush.max.actions**: Maximum amount of elements to buffer
+ * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
+ * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
+  settings in milliseconds
+
+This example code does the same, but with a `TransportClient`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+Map<String, String> config = Maps.newHashMap();
+// This instructs the sink to emit after every element, otherwise they would be buffered
+config.put("bulk.flush.max.actions", "1");
+config.put("cluster.name", "my-cluster-name");
+
+List<TransportAddress> transports = new ArrayList<String>();
+transports.add(new InetSocketTransportAddress("node-1", 9300));
+transports.add(new InetSocketTransportAddress("node-2", 9300));
+
+input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {
+    @Override
+    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+        Map<String, Object> json = new HashMap<>();
+        json.put("data", element);
+
+        return Requests.indexRequest()
+                .index("my-index")
+                .type("my-type")
+                .source(json);
+    }
+}));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val config = new util.HashMap[String, String]
+config.put("bulk.flush.max.actions", "1")
+config.put("cluster.name", "my-cluster-name")
+
+val transports = new ArrayList[String]
+transports.add(new InetSocketTransportAddress("node-1", 9300))
+transports.add(new InetSocketTransportAddress("node-2", 9300))
+
+text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {
+  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
+    val json = new util.HashMap[String, AnyRef]
+    json.put("data", element)
+    println("SENDING: " + element)
+    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
+  }
+}))
+{% endhighlight %}
+</div>
+</div>
+
+The difference is that we now need to provide a list of Elasticsearch Nodes
+to which the sink should connect using a `TransportClient`.
+
+More about information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/connectors/hdfs.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/hdfs.md b/docs/apis/streaming/connectors/hdfs.md
new file mode 100644
index 0000000..a9df354
--- /dev/null
+++ b/docs/apis/streaming/connectors/hdfs.md
@@ -0,0 +1,115 @@
+---
+title: "HDFS Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 3
+sub-nav-title: HDFS
+---
+
+This connector provides a Sink that writes rolling files to any filesystem supported by
+Hadoop FileSystem. To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-filesystem</artifactId>
+  <version>{{site.version}}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+#### Rolling File Sink
+
+The rolling behaviour as well as the writing can be configured but we will get to that later.
+This is how you can create a default rolling sink:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> input = ...;
+
+input.addSink(new RollingSink<String>("/base/path"));
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new RollingSink("/base/path"))
+
+{% endhighlight %}
+</div>
+</div>
+
+The only required parameter is the base path where the rolling files (buckets) will be
+stored. The sink can be configured by specifying a custom bucketer, writer and batch size.
+
+By default the rolling sink will use the pattern `"yyyy-MM-dd--HH"` to name the rolling buckets.
+This pattern is passed to `SimpleDateFormat` with the current system time to form a bucket path. A
+new bucket will be created whenever the bucket path changes. For example, if you have a pattern
+that contains minutes as the finest granularity you will get a new bucket every minute.
+Each bucket is itself a directory that contains several part files: Each parallel instance
+of the sink will create its own part file and when part files get too big the sink will also
+create a new part file next to the others. To specify a custom bucketer use `setBucketer()`
+on a `RollingSink`.
+
+The default writer is `StringWriter`. This will call `toString()` on the incoming elements
+and write them to part files, separated by newline. To specify a custom writer use `setWriter()`
+on a `RollingSink`. If you want to write Hadoop SequenceFiles you can use the provided
+`SequenceFileWriter` which can also be configured to use compression.
+
+The last configuration option is the batch size. This specifies when a part file should be closed
+and a new one started. (The default part file size is 384 MB).
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<IntWritable,Text>> input = ...;
+
+RollingSink sink = new RollingSink<String>("/base/path");
+sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
+sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
+sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Tuple2[IntWritable, Text]] = ...
+
+val sink = new RollingSink[String]("/base/path")
+sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
+sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+This will create a sink that writes to bucket files that follow this schema:
+
+```
+/base/path/{date-time}/part-{parallel-task}-{count}
+```
+
+Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index
+of the parallel sink instance and `count` is the running number of part files that where created
+because of the batch size.
+
+For in-depth information, please refer to the JavaDoc for
+[RollingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md
new file mode 100644
index 0000000..6b9f2c4
--- /dev/null
+++ b/docs/apis/streaming/connectors/index.md
@@ -0,0 +1,24 @@
+---
+title: "Streaming Connectors"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-id: connectors
+sub-nav-pos: 2
+sub-nav-title: Connectors
+---
+
+Connectors provide code for interfacing with various third-party systems.
+
+Currently these systems are supported:
+
+ * [Apache Kafka](https://kafka.apache.org/) (sink/source)
+ * [Elasticsearch](https://elastic.co/) (sink)
+ * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
+ * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
+ * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
+
+To run an application using one of these connectors, additional third party
+components are usually required to be installed and launched, e.g. the servers
+for the message queues. Further instructions for these can be found in the
+corresponding subsections.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md
new file mode 100644
index 0000000..15fe79e
--- /dev/null
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -0,0 +1,160 @@
+---
+title: "Apache Kafka Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Kafka
+---
+
+This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/).
+
+Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.
+The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide
+exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group
+offset tracking, but tracks and checkpoints these offsets internally as well.
+
+Please pick a package (maven artifact id) and class name for your use-case and environment.
+For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is appropriate.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Maven Dependency</th>
+      <th class="text-left">Supported since</th>
+      <th class="text-left">Class name</th>
+      <th class="text-left">Kafka version</th>
+      <th class="text-left">Notes</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>flink-connector-kafka</td>
+        <td>0.9.1, 0.10</td>
+        <td>FlinkKafkaConsumer081</td>
+        <td>0.8.1</td>
+        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
+    </tr>
+    <tr>
+        <td>flink-connector-kafka</td>
+        <td>0.9.1, 0.10</td>
+        <td>FlinkKafkaConsumer082</td>
+        <td>0.8.2</td>
+        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
+    </tr>
+  </tbody>
+</table>
+
+Then, import the connector in your maven project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Installing Apache Kafka
+
+* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
+* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
+* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
+
+#### Kafka Consumer
+
+The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor:
+
+1. The topic name
+2. A DeserializationSchema
+3. Properties for the Kafka consumer.
+  The following properties are required:
+  - "bootstrap.servers" (comma separated list of Kafka brokers)
+  - "zookeeper.connect" (comma separated list of Zookeeper servers)
+  - "group.id" the id of the consumer group
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+DataStream<String> stream = env
+	.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties))
+	.print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+stream = env
+    .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
+    .print
+{% endhighlight %}
+</div>
+</div>
+
+#### Kafka Consumers and Fault Tolerance
+
+With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
+its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
+the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where
+stored in the checkpoint.
+
+The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
+
+To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+</div>
+
+Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
+So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
+Flink on YARN supports automatic restart of lost YARN containers.
+
+If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
+
+#### Kafka Producer
+
+The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
+recors to partitions.
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
+{% endhighlight %}
+</div>
+</div>
+
+You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
+the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
+Kafka Producers.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/rabbitmq.md b/docs/apis/streaming/connectors/rabbitmq.md
new file mode 100644
index 0000000..3d36a35
--- /dev/null
+++ b/docs/apis/streaming/connectors/rabbitmq.md
@@ -0,0 +1,102 @@
+---
+title: "RabbitMQ Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 4
+sub-nav-title: RabbitMQ
+---
+
+This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-rabbitmq</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Installing RabbitMQ
+Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
+
+#### RabbitMQ Source
+
+A class which provides an interface for receiving data from RabbitMQ.
+
+The followings have to be provided for the `RMQSource(…)` constructor in order:
+
+- hostName: The RabbitMQ broker hostname.
+- queueName: The RabbitMQ queue name.
+- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`).
+- deserializationScehma: Deserialization schema to turn messages into Java objects.
+
+This source can be operated in three different modes:
+
+1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+    unique correlation IDs.
+2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
+    (correlation id is not set).
+3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+
+Correlation ids are a RabbitMQ application feature. You have to set it in the message properties
+when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply
+unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore
+messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't
+have to supply correlation ids.
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> streamWithoutCorrelationIds = env
+	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
+	.print
+
+DataStream<String> streamWithCorrelationIds = env
+	.addSource(new RMQSource<String>("localhost", "hello", true, new SimpleStringSchema()))
+	.print
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+streamWithoutCorrelationIds = env
+    .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))
+    .print
+
+streamWithCorrelationIds = env
+    .addSource(new RMQSource[String]("localhost", "hello", true, new SimpleStringSchema))
+    .print
+{% endhighlight %}
+</div>
+</div>
+
+#### RabbitMQ Sink
+A class providing an interface for sending data to RabbitMQ.
+
+The followings have to be provided for the `RMQSink(…)` constructor in order:
+
+1. The hostname
+2. The queue name
+3. Serialization schema
+
+Example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
+{% endhighlight %}
+</div>
+</div>
+
+More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/twitter.md b/docs/apis/streaming/connectors/twitter.md
new file mode 100644
index 0000000..4ab85f7
--- /dev/null
+++ b/docs/apis/streaming/connectors/twitter.md
@@ -0,0 +1,89 @@
+---
+title: "Twitter Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 5
+sub-nav-title: Twitter
+---
+
+Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream. To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-twitter</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Authentication
+In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
+
+#### Acquiring the authentication information
+First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
+After selecting the application, the API key and API secret (called `consumerKey` and `consumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary OAuth Access Token data (`token` and `secret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab.
+Remember to keep these pieces of information secret and do not push them to public repositories.
+
+#### Accessing the authentication information
+Create a properties file, and pass its path in the constructor of `TwitterSource`. The content of the file should be similar to this:
+
+~~~bash
+#properties file for my app
+secret=***
+consumerSecret=***
+token=***-***
+consumerKey=***
+~~~
+
+#### Constructors
+The `TwitterSource` class has two constructors.
+
+1. `public TwitterSource(String authPath, int numberOfTweets);`
+to emit a finite number of tweets
+2. `public TwitterSource(String authPath);`
+for streaming
+
+Both constructors expect a `String authPath` argument determining the location of the properties file containing the authentication information. In the first case, `numberOfTweets` determines how many tweet the source emits.
+
+#### Usage
+In contrast to other connectors, the `TwitterSource` depends on no additional services. For example the following code should run gracefully:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
+{% endhighlight %}
+</div>
+</div>
+
+The `TwitterSource` emits strings containing a JSON code.
+To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation `JSONParseFlatMap` abstract class among the examples. `JSONParseFlatMap` is an extension of the `FlatMapFunction` and has a
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+String getField(String jsonText, String field);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+getField(jsonText : String, field : String) : String
+{% endhighlight %}
+</div>
+</div>
+
+function which can be use to acquire the value of a given field.
+
+There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.
+
+#### Example
+`TwitterStream` is an example of how to use `TwitterSource`. It implements a language frequency counter program.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
new file mode 100644
index 0000000..6b54bd9
--- /dev/null
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -0,0 +1,196 @@
+---
+title: "Fault Tolerance"
+is_beta: false
+
+sub-nav-group: streaming
+sub-nav-id: fault_tolerance
+sub-nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink's fault tolerance mechanism recovers programs in the presence of failures and
+continues to execute them. Such failures include machine hardware failures, network failures,
+transient program failures, etc.
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Streaming Fault Tolerance (DataStream API)
+------------------------------------------
+
+Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a *persistent* (or *durable*) source that
+can be asked for prior records again (Apache Kafka is a good example of such a source).
+
+The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see [Working with State]({{ site.baseurl }}/apis/streaming_guide.html#working-with-state)) consistently to provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured [state backend]({{ site.baseurl }}/apis/state_backends.html).
+
+The [docs on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
+
+To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
+
+Other parameters for checkpointing include:
+
+- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
+  When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
+
+- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
+  Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
+
+- *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
+
+- *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete until then.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000);
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig().setCheckpointTimeout(60000);
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000)
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig.setCheckpointTimeout(60000)
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+{% endhighlight %}
+</div>
+</div>
+
+
+### Fault Tolerance Guarantees of Data Sources and Sinks
+
+Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the
+snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
+not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Source</th>
+      <th class="text-left" style="width: 25%">Guarantees</th>
+      <th class="text-left">Notes</th>
+    </tr>
+   </thead>
+   <tbody>
+        <tr>
+            <td>Apache Kafka</td>
+            <td>exactly once</td>
+            <td>Use the appropriate Kafka connector for your version</td>
+        </tr>
+        <tr>
+            <td>RabbitMQ</td>
+            <td>at most once (v 0.10) / exactly once (v 1.0) </td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>Twitter Streaming API</td>
+            <td>at most once</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>Collections</td>
+            <td>exactly once</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>Files</td>
+            <td>at least once</td>
+            <td>At failure the file will be read from the beginning</td>
+        </tr>
+        <tr>
+            <td>Sockets</td>
+            <td>at most once</td>
+            <td></td>
+        </tr>
+  </tbody>
+</table>
+
+To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs
+to take part in the checkpointing mechanism. The following table lists the delivery guarantees (assuming exactly-once
+state updates) of Flink coupled with bundled sinks:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Sink</th>
+      <th class="text-left" style="width: 25%">Guarantees</th>
+      <th class="text-left">Notes</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>HDFS rolling sink</td>
+        <td>exactly once</td>
+        <td>Implementation depends on Hadoop version</td>
+    </tr>
+    <tr>
+        <td>Elasticsearch</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>Kafka producer</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>File sinks</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>Socket sinks</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+    <tr>
+        <td>Standard output</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
+  </tbody>
+</table>
+
+{% top %}


Mime
View raw message