flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [44/89] [abbrv] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
Date Thu, 25 Aug 2016 18:48:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/libs/ml/svm.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/ml/svm.md b/docs/apis/batch/libs/ml/svm.md
deleted file mode 100644
index 6d09482..0000000
--- a/docs/apis/batch/libs/ml/svm.md
+++ /dev/null
@@ -1,223 +0,0 @@
----
-mathjax: include
-title: SVM using CoCoA
-# Sub navigation
-sub-nav-group: batch
-sub-nav-parent: flinkml
-sub-nav-title: SVM (CoCoA)
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-## Description
-
-Implements an SVM with soft-margin using the communication-efficient distributed dual coordinate
-ascent algorithm with hinge-loss function.
-The algorithm solves the following minimization problem:
-
-$$\min_{\mathbf{w} \in \mathbb{R}^d} \frac{\lambda}{2} \left\lVert \mathbf{w} \right\rVert^2 + \frac{1}{n} \sum_{i=1}^n l_{i}\left(\mathbf{w}^T\mathbf{x}_i\right)$$
-
-with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant,
-$$\mathbf{x}_i \in \mathbb{R}^d$$ being the data points and $$l_{i}$$ being the convex loss
-functions, which can also depend on the labels $$y_{i} \in \mathbb{R}$$.
-In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:
-
-  $$l_{i} = \max\left(0, 1 - y_{i} \mathbf{w}^T\mathbf{x}_i \right)$$
-
-With these choices, the problem definition is equivalent to a SVM with soft-margin.
-Thus, the algorithm allows us to train a SVM with soft-margin.
-
-The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
-In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates
-several iterations of SDCA locally on a data block before merging the local updates into a
-valid global state.
-This state is redistributed to the different data partitions where the next round of local SDCA
-iterations is then executed.
-The number of outer iterations and local SDCA iterations control the overall network costs, because
-there is only network communication required for each outer iteration.
-The local SDCA iterations are embarrassingly parallel once the individual data partitions have been
-distributed across the cluster.
-
-The implementation of this algorithm is based on the work of
-[Jaggi et al.](http://arxiv.org/abs/1409.1458)
-
-## Operations
-
-`SVM` is a `Predictor`.
-As such, it supports the `fit` and `predict` operation.
-
-### Fit
-
-SVM is trained given a set of `LabeledVector`:
-
-* `fit: DataSet[LabeledVector] => Unit`
-
-### Predict
-
-SVM predicts for all subtypes of FlinkML's `Vector` the corresponding class label:
-
-* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Double)]`, where the `(T, Double)` tuple
-  corresponds to (original_features, label)
-
-If we call evaluate with a `DataSet[(Vector, Double)]`, we make a prediction on the class label
-for each example, and return a `DataSet[(Double, Double)]`. In each tuple the first element
-is the true value, as was provided from the input `DataSet[(Vector, Double)]` and the second element
-is the predicted value. You can then use these `(truth, prediction)` tuples to evaluate
-the algorithm's performance.
-
-* `predict: DataSet[(Vector, Double)] => DataSet[(Double, Double)]`
-
-## Parameters
-
-The SVM implementation can be controlled by the following parameters:
-
-<table class="table table-bordered">
-<thead>
-  <tr>
-    <th class="text-left" style="width: 20%">Parameters</th>
-    <th class="text-center">Description</th>
-  </tr>
-</thead>
-
-<tbody>
-  <tr>
-    <td><strong>Blocks</strong></td>
-    <td>
-      <p>
-        Sets the number of blocks into which the input data will be split.
-        On each block the local stochastic dual coordinate ascent method is executed.
-        This number should be set at least to the degree of parallelism.
-        If no value is specified, then the parallelism of the input DataSet is used as the number of blocks.
-        (Default value: <strong>None</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Iterations</strong></td>
-    <td>
-      <p>
-        Defines the maximum number of iterations of the outer loop method.
-        In other words, it defines how often the SDCA method is applied to the blocked data.
-        After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value.
-        The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
-        (Default value: <strong>10</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-    <td><strong>LocalIterations</strong></td>
-    <td>
-      <p>
-        Defines the maximum number of SDCA iterations.
-        In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent.
-        (Default value: <strong>10</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Regularization</strong></td>
-    <td>
-      <p>
-        Defines the regularization constant of the SVM algorithm.
-        The higher the value, the smaller will the 2-norm of the weight vector be.
-        In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications.
-        (Default value: <strong>1.0</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Stepsize</strong></td>
-    <td>
-      <p>
-        Defines the initial step size for the updates of the weight vector.
-        The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value.
-        The effective scaling of the updates is $\frac{stepsize}{blocks}$.
-        This value has to be tuned in case that the algorithm becomes unstable.
-        (Default value: <strong>1.0</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-    <td><strong>ThresholdValue</strong></td>
-    <td>
-      <p>
-        Defines the limiting value for the decision function above which examples are labeled as
-        positive (+1.0). Examples with a decision function value below this value are classified
-        as negative (-1.0). In order to get the raw decision function values you need to indicate it by
-        using the OutputDecisionFunction parameter.  (Default value: <strong>0.0</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-    <td><strong>OutputDecisionFunction</strong></td>
-    <td>
-      <p>
-        Determines whether the predict and evaluate functions of the SVM should return the distance
-        to the separating hyperplane, or binary class labels. Setting this to true will 
-        return the raw distance to the hyperplane for each example. Setting it to false will 
-        return the binary class label (+1.0, -1.0) (Default value: <strong>false</strong>)
-      </p>
-    </td>
-  </tr>
-  <tr>
-  <td><strong>Seed</strong></td>
-  <td>
-    <p>
-      Defines the seed to initialize the random number generator.
-      The seed directly controls which data points are chosen for the SDCA method.
-      (Default value: <strong>Random Long Integer</strong>)
-    </p>
-  </td>
-</tr>
-</tbody>
-</table>
-
-## Examples
-
-{% highlight scala %}
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.math.Vector
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.classification.SVM
-import org.apache.flink.ml.RichExecutionEnvironment
-
-val pathToTrainingFile: String = ???
-val pathToTestingFile: String = ???
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-// Read the training data set, from a LibSVM formatted file
-val trainingDS: DataSet[LabeledVector] = env.readLibSVM(pathToTrainingFile)
-
-// Create the SVM learner
-val svm = SVM()
-  .setBlocks(10)
-
-// Learn the SVM model
-svm.fit(trainingDS)
-
-// Read the testing data set
-val testingDS: DataSet[Vector] = env.readLibSVM(pathToTestingFile).map(_.vector)
-
-// Calculate the predictions for the testing data set
-val predictionDS: DataSet[(Vector, Double)] = svm.predict(testingDS)
-
-{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
deleted file mode 100644
index c37b952..0000000
--- a/docs/apis/batch/libs/table.md
+++ /dev/null
@@ -1,26 +0,0 @@
----
-title: "Table API and SQL"
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-
-<meta http-equiv="refresh" content="1; url={{ site.baseurl }}/apis/table.html" />
-
-The *Table API guide* has been moved. Redirecting to [{{ site.baseurl }}/apis/table.html]({{ site.baseurl }}/apis/table.html) in 1 second.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
deleted file mode 100644
index b5e81c5..0000000
--- a/docs/apis/batch/python.md
+++ /dev/null
@@ -1,638 +0,0 @@
----
-title: "Python Programming Guide"
-is_beta: true
-
-# Sub-level navigation
-sub-nav-group: batch
-sub-nav-id: python_api
-sub-nav-pos: 4
-sub-nav-title: Python API
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Analysis programs in Flink are regular programs that implement transformations on data sets
-(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
-sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for
-example write the data to (distributed) files, or to standard output (for example the command line
-terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
-The execution can happen in a local JVM, or on clusters of many machines.
-
-In order to create your own Flink program, we encourage you to start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations). The remaining sections act as references for additional
-operations and advanced features.
-
-* This will be replaced by the TOC
-{:toc}
-
-Example Program
----------------
-
-The following program is a complete, working example of WordCount. You can copy &amp; paste the code
-to run it locally.
-
-{% highlight python %}
-from flink.plan.Environment import get_environment
-from flink.functions.GroupReduceFunction import GroupReduceFunction
-
-class Adder(GroupReduceFunction):
-  def reduce(self, iterator, collector):
-    count, word = iterator.next()
-    count += sum([x[0] for x in iterator])
-    collector.collect((count, word))
-
-env = get_environment()
-data = env.from_elements("Who's there?",
- "I think I hear them. Stand, ho! Who's there?")
-
-data \
-  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
-  .group_by(1) \
-  .reduce_group(Adder(), combinable=True) \
-  .output()
-
-env.execute(local=True)
-{% endhighlight %}
-
-{% top %}
-
-Program Skeleton
-----------------
-
-As we already saw in the example, Flink programs look like regular python programs. 
-Each program consists of the same basic parts:
-
-1. Obtain an `Environment`,
-2. Load/create the initial data,
-3. Specify transformations on this data,
-4. Specify where to put the results of your computations, and
-5. Execute your program.
-
-We will now give an overview of each of those steps but please refer to the respective sections for
-more details.
-
-
-The `Environment` is the basis for all Flink programs. You can
-obtain one using these static methods on class `Environment`:
-
-{% highlight python %}
-get_environment()
-{% endhighlight %}
-
-For specifying data sources the execution environment has several methods
-to read from files. To just read a text file as a sequence of lines, you can use:
-
-{% highlight python %}
-env = get_environment()
-text = env.read_text("file:///path/to/file")
-{% endhighlight %}
-
-This will give you a DataSet on which you can then apply transformations. For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataSet you can apply transformations to create a new
-DataSet which you can then write to a file, transform again, or
-combine with other DataSets. You apply transformations by calling
-methods on DataSet with your own custom transformation function. For example,
-a map transformation looks like this:
-
-{% highlight python %}
-data.map(lambda x: x*2)
-{% endhighlight %}
-
-This will create a new DataSet by doubling every value in the original DataSet.
-For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataSet that needs to be written to disk you can call one
-of these methods on DataSet:
-
-{% highlight python %}
-data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
-write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
-output()
-{% endhighlight %}
-
-The last method is only useful for developing/debugging on a local machine,
-it will output the contents of the DataSet to standard output. (Note that in
-a cluster, the result goes to the standard out stream of the cluster nodes and ends
-up in the *.out* files of the workers).
-The first two do as the name suggests.
-Please refer to [Data Sinks](#data-sinks) for more information on writing to files.
-
-Once you specified the complete program you need to call `execute` on
-the `Environment`. This will either execute on your local machine or submit your program
-for execution on a cluster, depending on how Flink was started. You can force
-a local execution by using `execute(local=True)`.
-
-{% top %}
-
-Project setup
----------------
-
-Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.
-
-The Python API was tested on Linux/Windows systems that have Python 2.7 or 3.4 installed.
-
-By default Flink will start python processes by calling "python" or "python3", depending on which start-script
-was used. By setting the "python.binary.python[2/3]" key in the flink-conf.yaml you can modify this behaviour to use a binary of your choice.
-
-{% top %}
-
-Lazy Evaluation
----------------
-
-All Flink programs are executed lazily: When the program's main method is executed, the data loading
-and transformations do not happen directly. Rather, each operation is created and added to the
-program's plan. The operations are actually executed when one of the `execute()` methods is invoked
-on the Environment object. Whether the program is executed locally or on a cluster depends
-on the environment of the program.
-
-The lazy evaluation lets you construct sophisticated programs that Flink executes as one
-holistically planned unit.
-
-{% top %}
-
-
-Transformations
----------------
-
-Data transformations transform one or more DataSets into a new DataSet. Programs can combine
-multiple transformations into sophisticated assemblies.
-
-This section gives a brief overview of the available transformations. The [transformations
-documentation](dataset_transformations.html) has a full description of all transformations with
-examples.
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <td><strong>Map</strong></td>
-      <td>
-        <p>Takes one element and produces one element.</p>
-{% highlight python %}
-data.map(lambda x: x * 2)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>FlatMap</strong></td>
-      <td>
-        <p>Takes one element and produces zero, one, or more elements. </p>
-{% highlight python %}
-data.flat_map(
-  lambda x,c: [(1,word) for word in line.lower().split() for line in x])
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>MapPartition</strong></td>
-      <td>
-        <p>Transforms a parallel partition in a single function call. The function get the partition
-        as an `Iterator` and can produce an arbitrary number of result values. The number of
-        elements in each partition depends on the parallelism and previous operations.</p>
-{% highlight python %}
-data.map_partition(lambda x,c: [value * 2 for value in x])
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Filter</strong></td>
-      <td>
-        <p>Evaluates a boolean function for each element and retains those for which the function
-        returns true.</p>
-{% highlight python %}
-data.filter(lambda x: x > 1000)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Reduce</strong></td>
-      <td>
-        <p>Combines a group of elements into a single element by repeatedly combining two elements
-        into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
-{% highlight python %}
-data.reduce(lambda x,y : x + y)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>ReduceGroup</strong></td>
-      <td>
-        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
-        full data set, or on a grouped data set.</p>
-{% highlight python %}
-class Adder(GroupReduceFunction):
-  def reduce(self, iterator, collector):
-    count, word = iterator.next()
-    count += sum([x[0] for x in iterator)      
-    collector.collect((count, word))
-
-data.reduce_group(Adder())
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Aggregate</strong></td>
-      <td>
-        <p>Performs a built-in operation (sum, min, max) on one field of all the Tuples in a
-        data set or in each group of a data set. Aggregation can be applied on a full dataset
-        or on a grouped data set.</p>
-{% highlight python %}
-# This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field
-data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)
-
-# min(), max(), and sum() syntactic sugar functions are also available
-data.sum(0).and_agg(Aggregation.Max, 1)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    </tr>
-      <td><strong>Join</strong></td>
-      <td>
-        Joins two data sets by creating all pairs of elements that are equal on their keys.
-        Optionally uses a JoinFunction to turn the pair of elements into a single element.
-        See <a href="#specifying-keys">keys</a> on how to define join keys.
-{% highlight python %}
-# In this case tuple fields are used as keys.
-# "0" is the join field on the first tuple
-# "1" is the join field on the second tuple.
-result = input1.join(input2).where(0).equal_to(1)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>CoGroup</strong></td>
-      <td>
-        <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
-        fields and then joins the groups. The transformation function is called per pair of groups.
-        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
-{% highlight python %}
-data1.co_group(data2).where(0).equal_to(1)
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Cross</strong></td>
-      <td>
-        <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
-        elements. Optionally uses a CrossFunction to turn the pair of elements into a single
-        element.</p>
-{% highlight python %}
-result = data1.cross(data2)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>Union</strong></td>
-      <td>
-        <p>Produces the union of two data sets.</p>
-{% highlight python %}
-data.union(data2)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>ZipWithIndex</strong></td>
-      <td>
-        <p>Assigns consecutive indexes to each element. For more information, please refer to
-        the [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).</p>
-{% highlight python %}
-data.zip_with_index()
-{% endhighlight %}
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-{% top %}
-
-
-Specifying Keys
--------------
-
-Some transformations (like Join or CoGroup) require that a key is defined on
-its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are
-applied.
-
-A DataSet is grouped as
-{% highlight python %}
-reduced = data \
-  .group_by(<define key here>) \
-  .reduce_group(<do something>)
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data set types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-### Define keys for Tuples
-{:.no_toc}
-
-The simplest case is grouping a data set of Tuples on one or more
-fields of the Tuple:
-{% highlight python %}
-reduced = data \
-  .group_by(0) \
-  .reduce_group(<do something>)
-{% endhighlight %}
-
-The data set is grouped on the first field of the tuples.
-The group-reduce function will thus receive groups of tuples with
-the same value in the first field.
-
-{% highlight python %}
-grouped = data \
-  .group_by(0,1) \
-  .reduce(/*do something*/)
-{% endhighlight %}
-
-The data set is grouped on the composite key consisting of the first and the
-second fields, therefore the reduce function will receive groups
-with the same value for both fields.
-
-A note on nested Tuples: If you have a DataSet with a nested tuple
-specifying `group_by(<index of tuple>)` will cause the system to use the full tuple as a key.
-
-{% top %}
-
-
-Passing Functions to Flink
---------------------------
-
-Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.
-
-{% highlight python %}
-data.filter(lambda x: x > 5)
-{% endhighlight %}
-
-{% highlight python %}
-class Filter(FilterFunction):
-    def filter(self, value):
-        return value > 5
-
-data.filter(Filter())
-{% endhighlight %}
-
-Rich functions allow the use of imported functions, provide access to broadcast-variables,
-can be parameterized using __init__(), and are the go-to-option for complex functions.
-They are also the only way to define an optional `combine` function for a reduce operation.
-
-Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return
-an iterable, if the operation can return multiple values. (All functions receiving a collector argument)
-
-{% top %}
-
-Data Types
-----------
-
-Flink's Python API currently only offers native support for primitive python types (int, float, bool, string) and byte arrays.
-
-The type support can be extended by passing a serializer, deserializer and type class to the environment.
-{% highlight python %}
-class MyObj(object):
-    def __init__(self, i):
-        self.value = i
-
-
-class MySerializer(object):
-    def serialize(self, value):
-        return struct.pack(">i", value.value)
-
-
-class MyDeserializer(object):
-    def _deserialize(self, read):
-        i = struct.unpack(">i", read(4))[0]
-        return MyObj(i)
-
-
-env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
-{% endhighlight %}
-
-#### Tuples/Lists
-
-You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain
-a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.
-
-{% highlight python %}
-word_counts = env.from_elements(("hello", 1), ("world",2))
-
-counts = word_counts.map(lambda x: x[1])
-{% endhighlight %}
-
-When working with operators that require a Key for grouping or matching records,
-Tuples let you simply specify the positions of the fields to be used as key. You can specify more
-than one position to use composite keys (see [Section Data Transformations](#transformations)).
-
-{% highlight python %}
-wordCounts \
-    .group_by(0) \
-    .reduce(MyReduceFunction())
-{% endhighlight %}
-
-{% top %}
-
-Data Sources
-------------
-
-Data sources create the initial data sets, such as from files or from collections.
-
-File-based:
-
-- `read_text(path)` - Reads files line wise and returns them as Strings.
-- `read_csv(path, type)` - Parses files of comma (or another char) delimited fields.
-  Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
-  types.
-
-Collection-based:
-
-- `from_elements(*args)` - Creates a data set from a Seq. All elements
-- `generate_sequence(from, to)` - Generates the sequence of numbers in the given interval, in parallel. 
-
-**Examples**
-
-{% highlight python %}
-env  = get_environment
-
-\# read text file from local files system
-localLiens = env.read_text("file:#/path/to/my/textfile")
-
-\# read text file from a HDFS running at nnHost:nnPort
-hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
-
-\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
-csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
-
-\# create a set from some given elements
-values = env.from_elements("Foo", "bar", "foobar", "fubar")
-
-\# generate a number sequence
-numbers = env.generate_sequence(1, 10000000)
-{% endhighlight %}
-
-{% top %}
-
-Data Sinks
-----------
-
-Data sinks consume DataSets and are used to store or return them:
-
-- `write_text()` - Writes elements line-wise as Strings. The Strings are
-  obtained by calling the *str()* method of each element.
-- `write_csv(...)` - Writes tuples as comma-separated value files. Row and field
-  delimiters are configurable. The value for each field comes from the *str()* method of the objects.
-- `output()` - Prints the *str()* value of each element on the
-  standard out.
-
-A DataSet can be input to multiple operations. Programs can write or print a data set and at the
-same time run additional transformations on them.
-
-**Examples**
-
-Standard data sink methods:
-
-{% highlight scala %}
- write DataSet to a file on the local file system
-textData.write_text("file:///my/result/on/localFS")
-
- write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
-textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
-
- write DataSet to a file and overwrite the file if it exists
-textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
-
- tuples as lines with pipe as the separator "a|b|c"
-values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
-
- this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
-values.write_text("file:///path/to/the/result/file")
-{% endhighlight %}
-
-{% top %}
-
-Broadcast Variables
--------------------
-
-Broadcast variables allow you to make a data set available to all parallel instances of an
-operation, in addition to the regular input of the operation. This is useful for auxiliary data
-sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
-Collection.
-
-- **Broadcast**: broadcast sets are registered by name via `with_broadcast_set(DataSet, String)`
-- **Access**: accessible via `self.context.get_broadcast_variable(String)` at the target operator
-
-{% highlight python %}
-class MapperBcv(MapFunction):
-    def map(self, value):
-        factor = self.context.get_broadcast_variable("bcv")[0][0]
-        return value * factor
-
-# 1. The DataSet to be broadcasted
-toBroadcast = env.from_elements(1, 2, 3)
-data = env.from_elements("a", "b")
-
-# 2. Broadcast the DataSet
-data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast) 
-{% endhighlight %}
-
-Make sure that the names (`bcv` in the previous example) match when registering and
-accessing broadcasted data sets.
-
-**Note**: As the content of broadcast variables is kept in-memory on each node, it should not become
-too large. For simpler things like scalar values you can simply parameterize the rich function.
-
-{% top %}
-
-Parallel Execution
-------------------
-
-This section describes how the parallel execution of programs can be configured in Flink. A Flink
-program consists of multiple tasks (operators, data sources, and sinks). A task is split into
-several parallel instances for execution and each parallel instance processes a subset of the task's
-input data. The number of parallel instances of a task is called its *parallelism* or *degree of
-parallelism (DOP)*.
-
-The degree of parallelism of a task can be specified in Flink on different levels.
-
-### Execution Environment Level
-
-Flink programs are executed in the context of an [execution environment](#program-skeleton). An
-execution environment defines a default parallelism for all operators, data sources, and data sinks
-it executes. Execution environment parallelism can be overwritten by explicitly configuring the
-parallelism of an operator.
-
-The default parallelism of an execution environment can be specified by calling the
-`set_parallelism()` method. To execute all operators, data sources, and data sinks of the
-[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the
-execution environment as follows:
-
-{% highlight python %}
-env = get_environment()
-env.set_parallelism(3)
-
-text.flat_map(lambda x,c: x.lower().split()) \
-    .group_by(1) \
-    .reduce_group(Adder(), combinable=True) \
-    .output()
-
-env.execute()
-{% endhighlight %}
-
-### System Level
-
-A system-wide default parallelism for all execution environments can be defined by setting the
-`parallelism.default` property in `./conf/flink-conf.yaml`. See the
-[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details.
-
-{% top %}
-
-Executing Plans
----------------
-
-To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder.
-use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed
-as the first argument, followed by a number of additional python packages, and finally, separated by - additional
-arguments that will be fed to the script.
-
-{% highlight python %}
-./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
-{% endhighlight %}
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/zip_elements_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/zip_elements_guide.md b/docs/apis/batch/zip_elements_guide.md
deleted file mode 100644
index e3e93b5..0000000
--- a/docs/apis/batch/zip_elements_guide.md
+++ /dev/null
@@ -1,128 +0,0 @@
----
-title: "Zipping Elements in a DataSet"
-# Sub-level navigation
-sub-nav-group: batch
-sub-nav-parent: dataset_api
-sub-nav-pos: 2
-sub-nav-title: Zipping Elements
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-In certain algorithms, one may need to assign unique identifiers to data set elements.
-This document shows how {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java "DataSetUtils" %} can be used for that purpose.
-
-* This will be replaced by the TOC
-{:toc}
-
-### Zip with a Dense Index
-`zipWithIndex` assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of `(unique id, initial value)` 2-tuples.
-This process requires two passes, first counting then labeling elements, and cannot be pipelined due to the synchronization of counts.
-The alternative `zipWithUniqueId` works in a pipelined fashion and is preferred when a unique labeling is sufficient.
-For example, the following code:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(2);
-DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
-
-DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
-
-result.writeAsCsv(resultPath, "\n", ",");
-env.execute();
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-import org.apache.flink.api.scala._
-
-val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-env.setParallelism(2)
-val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
-
-val result: DataSet[(Long, String)] = input.zipWithIndex
-
-result.writeAsCsv(resultPath, "\n", ",")
-env.execute()
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight python %}
-from flink.plan.Environment import get_environment
-
-env = get_environment()
-env.set_parallelism(2)
-input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
-
-result = input.zipWithIndex()
-
-result.write_text(result_path)
-env.execute()
-{% endhighlight %}
-</div>
-
-</div>
-
-may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
-
-[Back to top](#top)
-
-### Zip with a Unique Identifier
-In many cases one may not need to assign consecutive labels.
-`zipWithUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of `(unique id, initial value)` 2-tuples.
-For example, the following code:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(2);
-DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
-
-DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
-
-result.writeAsCsv(resultPath, "\n", ",");
-env.execute();
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-import org.apache.flink.api.scala._
-
-val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-env.setParallelism(2)
-val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
-
-val result: DataSet[(Long, String)] = input.zipWithUniqueId
-
-result.writeAsCsv(resultPath, "\n", ",")
-env.execute()
-{% endhighlight %}
-</div>
-
-</div>
-
-may yield the tuples: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)
-
-[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md
deleted file mode 100644
index 7ae1b64..0000000
--- a/docs/apis/best_practices.md
+++ /dev/null
@@ -1,403 +0,0 @@
----
-title: "Best Practices"
-# Top-level navigation
-top-nav-group: apis
-top-nav-pos: 5
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems.
-
-
-* This will be replaced by the TOC
-{:toc}
-
-## Parsing command line arguments and passing them around in your Flink application
-
-
-Almost all Flink applications, both batch and streaming rely on external configuration parameters.
-For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).
-
-Since version 0.9 we are providing a simple utility called `ParameterTool` to provide at least some basic tooling for solving these problems.
-
-Please note that you don't have to use the `ParameterTool` explained here. Other frameworks such as [Commons CLI](https://commons.apache.org/proper/commons-cli/),
-[argparse4j](http://argparse4j.sourceforge.net/) and others work well with Flink as well.
-
-
-### Getting your configuration values into the `ParameterTool`
-
-The `ParameterTool` provides a set of predefined static methods for reading the configuration. The tool is internally expecting a `Map<String, String>`, so its very easy to integrate it with your own configuration style.
-
-
-#### From `.properties` files
-
-The following method will read a [Properties](https://docs.oracle.com/javase/tutorial/essential/environment/properties.html) file and provide the key/value pairs:
-{% highlight java %}
-String propertiesFile = "/home/sam/flink/myjob.properties";
-ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
-{% endhighlight %}
-
-
-#### From the command line arguments
-
-This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
-{% highlight java %}
-public static void main(String[] args) {
-	ParameterTool parameter = ParameterTool.fromArgs(args);
-	// .. regular code ..
-{% endhighlight %}
-
-
-#### From system properties
-
-When starting a JVM, you can pass system properties to it: `-Dinput=hdfs:///mydata`. You can also initialize the `ParameterTool` from these system properties:
-
-{% highlight java %}
-ParameterTool parameter = ParameterTool.fromSystemProperties();
-{% endhighlight %}
-
-
-### Using the parameters in your Flink program
-
-Now that we've got the parameters from somewhere (see above) we can use them in various ways.
-
-**Directly from the `ParameterTool`**
-
-The `ParameterTool` itself has methods for accessing the values.
-{% highlight java %}
-ParameterTool parameters = // ...
-parameter.getRequired("input");
-parameter.get("output", "myDefaultValue");
-parameter.getLong("expectedCount", -1L);
-parameter.getNumberOfParameters()
-// .. there are more methods available.
-{% endhighlight %}
-
-You can use the return values of these methods directly in the main() method (=the client submitting the application).
-For example you could set the parallelism of a operator like this:
-
-{% highlight java %}
-ParameterTool parameters = ParameterTool.fromArgs(args);
-int parallelism = parameters.get("mapParallelism", 2);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
-{% endhighlight %}
-
-Since the `ParameterTool` is serializable, you can pass it to the functions itself:
-
-{% highlight java %}
-ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
-{% endhighlight %}
-
-and then use them inside the function for getting values from the command line.
-
-
-#### Passing it as a `Configuration` object to single functions
-
-The example below shows how to pass the parameters as a `Configuration` object to a user defined function.
-
-{% highlight java %}
-ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
-{% endhighlight %}
-
-In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
-
-{% highlight java %}
-public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		parameters.getInteger("myInt", -1);
-		// .. do
-{% endhighlight %}
-
-
-#### Register the parameters globally
-
-Parameters registered as a [global job parameter](programming_guide.html#passing-parameters-to-functions) at the `ExecutionConfig` allow you to access the configuration values from the JobManager web interface and all functions defined by the user.
-
-**Register the parameters globally**
-
-{% highlight java %}
-ParameterTool parameters = ParameterTool.fromArgs(args);
-
-// set up the execution environment
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.getConfig().setGlobalJobParameters(parameters);
-{% endhighlight %}
-
-Access them in any rich user function:
-
-{% highlight java %}
-public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
-
-	@Override
-	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-		parameters.getRequired("input");
-		// .. do more ..
-{% endhighlight %}
-
-
-## Naming large TupleX types
-
-It is recommended to use POJOs (Plain old Java objects) instead of `TupleX` for data types with many fields.
-Also, POJOs can be used to give large `Tuple`-types a name.
-
-**Example**
-
-Instead of using:
-
-
-~~~java
-Tuple11<String, String, ..., String> var = new ...;
-~~~
-
-
-It is much easier to create a custom type extending from the large Tuple type.
-
-~~~java
-CustomType var = new ...;
-
-public static class CustomType extends Tuple11<String, String, ..., String> {
-    // constructor matching super
-}
-~~~
-
-
-## Register a custom serializer for your Flink program
-
-If you use a custom type in your Flink program which cannot be serialized by the
-Flink type serializer, Flink falls back to using the generic Kryo
-serializer. You may register your own serializer or a serialization system like
-Google Protobuf or Apache Thrift with Kryo. To do that, simply register the type
-class and the serializer in the `ExecutionConfig` of your Flink program.
-
-
-{% highlight java %}
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// register the class of the serializer as serializer for a type
-env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
-
-// register an instance as serializer for a type
-MySerializer mySerializer = new MySerializer();
-env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
-{% endhighlight %}
-
-Note that your custom serializer has to extend Kryo's Serializer class. In the
-case of Google Protobuf or Apache Thrift, this has already been done for
-you:
-
-{% highlight java %}
-
-final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// register the Google Protobuf serializer with Kryo
-env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
-
-// register the serializer included with Apache Thrift as the standard serializer
-// TBaseSerializer states it should be initialized as a default Kryo serializer
-env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
-
-{% endhighlight %}
-
-For the above example to work, you need to include the necessary dependencies in
-your Maven project file (pom.xml). In the dependency section, add the following
-for Apache Thrift:
-
-{% highlight xml %}
-
-<dependency>
-	<groupId>com.twitter</groupId>
-	<artifactId>chill-thrift</artifactId>
-	<version>0.5.2</version>
-</dependency>
-<!-- libthrift is required by chill-thrift -->
-<dependency>
-	<groupId>org.apache.thrift</groupId>
-	<artifactId>libthrift</artifactId>
-	<version>0.6.1</version>
-	<exclusions>
-		<exclusion>
-			<groupId>javax.servlet</groupId>
-			<artifactId>servlet-api</artifactId>
-		</exclusion>
-		<exclusion>
-			<groupId>org.apache.httpcomponents</groupId>
-			<artifactId>httpclient</artifactId>
-		</exclusion>
-	</exclusions>
-</dependency>
-
-{% endhighlight %}
-
-For Google Protobuf you need the following Maven dependency:
-
-{% highlight xml %}
-
-<dependency>
-	<groupId>com.twitter</groupId>
-	<artifactId>chill-protobuf</artifactId>
-	<version>0.5.2</version>
-</dependency>
-<!-- We need protobuf for chill-protobuf -->
-<dependency>
-	<groupId>com.google.protobuf</groupId>
-	<artifactId>protobuf-java</artifactId>
-	<version>2.5.0</version>
-</dependency>
-
-{% endhighlight %}
-
-
-Please adjust the versions of both libraries as needed.
-
-
-## Using Logback instead of Log4j
-
-**Note: This tutorial is applicable starting from Flink 0.10**
-
-Apache Flink is using [slf4j](http://www.slf4j.org/) as the logging abstraction in the code. Users are advised to use sfl4j as well in their user functions.
-
-Sfl4j is a compile-time logging interface that can use different logging implementations at runtime, such as [log4j](http://logging.apache.org/log4j/2.x/) or [Logback](http://logback.qos.ch/).
-
-Flink is depending on Log4j by default. This page describes how to use Flink with Logback. Users reported that they were also able to set up centralized logging with Graylog using this tutorial.
-
-To get a logger instance in the code, use the following code:
-
-
-{% highlight java %}
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MyClass implements MapFunction {
-	private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
-	// ...
-{% endhighlight %}
-
-
-### Use Logback when running Flink out of the IDE / from a Java application
-
-
-In all cases were classes are executed with a classpath created by a dependency manager such as Maven, Flink will pull log4j into the classpath.
-
-Therefore, you will need to exclude log4j from Flink's dependencies. The following description will assume a Maven project created from a [Flink quickstart](../quickstart/java_api_quickstart.html).
-
-Change your projects `pom.xml` file like this:
-
-{% highlight xml %}
-<dependencies>
-	<!-- Add the two required logback dependencies -->
-	<dependency>
-		<groupId>ch.qos.logback</groupId>
-		<artifactId>logback-core</artifactId>
-		<version>1.1.3</version>
-	</dependency>
-	<dependency>
-		<groupId>ch.qos.logback</groupId>
-		<artifactId>logback-classic</artifactId>
-		<version>1.1.3</version>
-	</dependency>
-
-	<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
-	 Hadoop is logging to log4j! -->
-	<dependency>
-		<groupId>org.slf4j</groupId>
-		<artifactId>log4j-over-slf4j</artifactId>
-		<version>1.7.7</version>
-	</dependency>
-
-	<dependency>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-java</artifactId>
-		<version>{{ site.version }}</version>
-		<exclusions>
-			<exclusion>
-				<groupId>log4j</groupId>
-				<artifactId>*</artifactId>
-			</exclusion>
-			<exclusion>
-				<groupId>org.slf4j</groupId>
-				<artifactId>slf4j-log4j12</artifactId>
-			</exclusion>
-		</exclusions>
-	</dependency>
-	<dependency>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
-		<version>{{ site.version }}</version>
-		<exclusions>
-			<exclusion>
-				<groupId>log4j</groupId>
-				<artifactId>*</artifactId>
-			</exclusion>
-			<exclusion>
-				<groupId>org.slf4j</groupId>
-				<artifactId>slf4j-log4j12</artifactId>
-			</exclusion>
-		</exclusions>
-	</dependency>
-	<dependency>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
-		<version>{{ site.version }}</version>
-		<exclusions>
-			<exclusion>
-				<groupId>log4j</groupId>
-				<artifactId>*</artifactId>
-			</exclusion>
-			<exclusion>
-				<groupId>org.slf4j</groupId>
-				<artifactId>slf4j-log4j12</artifactId>
-			</exclusion>
-		</exclusions>
-	</dependency>
-</dependencies>
-{% endhighlight %}
-
-The following changes were done in the `<dependencies>` section:
-
- * Exclude all `log4j` dependencies from all Flink dependencies: This causes Maven to ignore Flink's transitive dependencies to log4j.
- * Exclude the `slf4j-log4j12` artifact from Flink's dependencies: Since we are going to use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
- * Add the Logback dependencies: `logback-core` and `logback-classic`
- * Add dependencies for `log4j-over-slf4j`. `log4j-over-slf4j` is a tool which allows legacy applications which are directly using the Log4j APIs to use the Slf4j interface. Flink depends on Hadoop which is directly using Log4j for logging. Therefore, we need to redirect all logger calls from Log4j to Slf4j which is in turn logging to Logback.
-
-Please note that you need to manually add the exclusions to all new Flink dependencies you are adding to the pom file.
-
-You may also need to check if other dependencies (non Flink) are pulling in log4j bindings. You can analyze the dependencies of your project with `mvn dependency:tree`.
-
-
-
-### Use Logback when running Flink on a cluster
-
-This tutorial is applicable when running Flink on YARN or as a standalone cluster.
-
-In order to use Logback instead of Log4j with Flink, you need to remove the `log4j-1.2.xx.jar` and `sfl4j-log4j12-xxx.jar` from the `lib/` directory.
-
-Next, you need to put the following jar files into the `lib/` folder:
-
- * `logback-classic.jar`
- * `logback-core.jar`
- * `log4j-over-slf4j.jar`: This bridge needs to be present in the classpath for redirecting logging calls from Hadoop (which is using Log4j) to Slf4j.
-
-Note that you need to explicitly set the `lib/` directory when using a per job YARN cluster.
-
-The command to submit Flink on YARN with a custom logger is: `./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>`

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
deleted file mode 100644
index c272413..0000000
--- a/docs/apis/cli.md
+++ /dev/null
@@ -1,322 +0,0 @@
----
-title:  "Command-Line Interface"
-# Top-level navigation
-top-nav-group: apis
-top-nav-pos: 5
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink provides a command-line interface to run programs that are packaged
-as JAR files, and control their execution.  The command line interface is part
-of any Flink setup, available in local single node setups and in
-distributed setups. It is located under `<flink-home>/bin/flink`
-and connects by default to the running Flink master (JobManager) that was
-started from the same installation directory.
-
-A prerequisite to using the command line interface is that the Flink
-master (JobManager) has been started (via
-`<flink-home>/bin/start-local.sh` or
-`<flink-home>/bin/start-cluster.sh`) or that a YARN environment is
-available.
-
-The command line can be used to
-
-- submit jobs for execution,
-- cancel a running job,
-- provide information about a job, and
-- list running and waiting jobs.
-
-* This will be replaced by the TOC
-{:toc}
-
-## Examples
-
--   Run example program with no arguments.
-
-        ./bin/flink run ./examples/batch/WordCount.jar
-
--   Run example program with arguments for input and result files
-
-        ./bin/flink run ./examples/batch/WordCount.jar \
-                               file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
--   Run example program with parallelism 16 and arguments for input and result files
-
-        ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
-                                file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
--   Run example program with flink log output disabled
-
-            ./bin/flink run -q ./examples/batch/WordCount.jar
-
--   Run example program in detached mode
-
-            ./bin/flink run -d ./examples/batch/WordCount.jar
-
--   Run example program on a specific JobManager:
-
-        ./bin/flink run -m myJMHost:6123 \
-                               ./examples/batch/WordCount.jar \
-                               file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
--   Run example program with a specific class as an entry point:
-
-        ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
-                               ./examples/batch/WordCount.jar \
-                               file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
--   Run example program using a [per-job YARN cluster]({{site.baseurl}}/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
-
-        ./bin/flink run -m yarn-cluster -yn 2 \
-                               ./examples/batch/WordCount.jar \
-                               hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
-
--   Display the optimized execution plan for the WordCount example program as JSON:
-
-        ./bin/flink info ./examples/batch/WordCount.jar \
-                                file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
--   List scheduled and running jobs (including their JobIDs):
-
-        ./bin/flink list
-
--   List scheduled jobs (including their JobIDs):
-
-        ./bin/flink list -s
-
--   List running jobs (including their JobIDs):
-
-        ./bin/flink list -r
-
--   List running Flink jobs inside Flink YARN session:
-
-        ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
-
--   Cancel a job:
-
-        ./bin/flink cancel <jobID>
-
--   Stop a job (streaming jobs only):
-
-        ./bin/flink stop <jobID>
-
-
-The difference between cancelling and stopping a (streaming) job is the following:
-
-On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
-soon as possible.
-If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically
-until it stops.
-
-A "stop" call is a more graceful way of stopping a running streaming job. Stop is only available for jobs
-which use sources that implement the `StoppableFunction` interface. When the user requests to stop a job,
-all sources will receive a `stop()` method call. The job will keep running until all sources properly shut down.
-This allows the job to finish processing all inflight data.
-
-### Savepoints
-
-[Savepoints]({{site.baseurl}}/apis/streaming/savepoints.html) are controlled via the command line client:
-
-#### Trigger a savepoint
-
-{% highlight bash %}
-./bin/flink savepoint <jobID>
-{% endhighlight %}
-
-Returns the path of the created savepoint. You need this path to restore and dispose savepoints.
-
-#### **Restore a savepoint**
-
-{% highlight bash %}
-./bin/flink run -s <savepointPath> ...
-{% endhighlight %}
-
-The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
-
-#### **Dispose a savepoint**
-
-{% highlight bash %}
-./bin/flink savepoint -d <savepointPath>
-{% endhighlight %}
-
-Disposes the savepoint at the given path. The savepoint path is returned by the savepoint trigger command.
-
-If you use custom state instances (for example custom reducing state or RocksDB state), you have to specify the path to the program JAR with which the savepoint was triggered in order to dispose the savepoint with the user code class loader:
-
-{% highlight bash %}
-./bin/flink savepoint -d <savepointPath> -j <jarFile>
-{% endhighlight %}
-
-Otherwise, you will run into a `ClassNotFoundException`.
-
-## Usage
-
-The command line syntax is as follows:
-
-~~~
-./flink <ACTION> [OPTIONS] [ARGUMENTS]
-
-The following actions are available:
-
-Action "run" compiles and runs a program.
-
-  Syntax: run [OPTIONS] <jar-file> <arguments>
-  "run" action options:
-     -c,--class <classname>               Class with the program entry point
-                                          ("main" method or "getPlan()" method.
-                                          Only needed if the JAR file does not
-                                          specify the class in its manifest.
-     -C,--classpath <url>                 Adds a URL to each user code
-                                          classloader  on all nodes in the
-                                          cluster. The paths must specify a
-                                          protocol (e.g. file://) and be
-                                          accessible on all nodes (e.g. by means
-                                          of a NFS share). You can use this
-                                          option multiple times for specifying
-                                          more than one URL. The protocol must
-                                          be supported by the {@link
-                                          java.net.URLClassLoader}.
-     -d,--detached                        If present, runs the job in detached
-                                          mode
-     -m,--jobmanager <host:port>          Address of the JobManager (master) to
-                                          which to connect. Specify
-                                          'yarn-cluster' as the JobManager to
-                                          deploy a YARN cluster for the job. Use
-                                          this flag to connect to a different
-                                          JobManager than the one specified in
-                                          the configuration.
-     -p,--parallelism <parallelism>       The parallelism with which to run the
-                                          program. Optional flag to override the
-                                          default value specified in the
-                                          configuration.
-     -q,--sysoutLogging                   If present, supress logging output to
-                                          standard out.
-     -s,--fromSavepoint <savepointPath>   Path to a savepoint to reset the job
-                                          back to (for example
-                                          file:///flink/savepoint-1537).
-  Additional arguments if -m yarn-cluster is set:
-     -yD <arg>                            Dynamic properties
-     -yd,--yarndetached                   Start detached
-     -yj,--yarnjar <arg>                  Path to Flink jar file
-     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
-                                          MB]
-     -yn,--yarncontainer <arg>            Number of YARN container to allocate
-                                          (=Number of Task Managers)
-     -ynm,--yarnname <arg>                Set a custom name for the application
-                                          on YARN
-     -yq,--yarnquery                      Display available YARN resources
-                                          (memory, cores)
-     -yqu,--yarnqueue <arg>               Specify YARN queue.
-     -ys,--yarnslots <arg>                Number of slots per TaskManager
-     -yst,--yarnstreaming                 Start Flink in streaming mode
-     -yt,--yarnship <arg>                 Ship files in the specified directory
-                                          (t for transfer)
-     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
-                                          MB]
-
-
-Action "info" shows the optimized execution plan of the program (JSON).
-
-  Syntax: info [OPTIONS] <jar-file> <arguments>
-  "info" action options:
-     -c,--class <classname>           Class with the program entry point ("main"
-                                      method or "getPlan()" method. Only needed
-                                      if the JAR file does not specify the class
-                                      in its manifest.
-     -m,--jobmanager <host:port>      Address of the JobManager (master) to
-                                      which to connect. Specify 'yarn-cluster'
-                                      as the JobManager to deploy a YARN cluster
-                                      for the job. Use this flag to connect to a
-                                      different JobManager than the one
-                                      specified in the configuration.
-     -p,--parallelism <parallelism>   The parallelism with which to run the
-                                      program. Optional flag to override the
-                                      default value specified in the
-                                      configuration.
-
-
-Action "list" lists running and scheduled programs.
-
-  Syntax: list [OPTIONS]
-  "list" action options:
-     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Specify 'yarn-cluster' as the
-                                   JobManager to deploy a YARN cluster for the
-                                   job. Use this flag to connect to a different
-                                   JobManager than the one specified in the
-                                   configuration.
-     -r,--running                  Show only running programs and their JobIDs
-     -s,--scheduled                Show only scheduled programs and their JobIDs
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
-
-
-Action "cancel" cancels a running program.
-
-  Syntax: cancel [OPTIONS] <Job ID>
-  "cancel" action options:
-     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Specify 'yarn-cluster' as the
-                                   JobManager to deploy a YARN cluster for the
-                                   job. Use this flag to connect to a different
-                                   JobManager than the one specified in the
-                                   configuration.
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
-
-
-Action "stop" stops a running program (streaming jobs only). There are no strong consistency
-guarantees for a stop request.
-
-  Syntax: stop [OPTIONS] <Job ID>
-  "stop" action options:
-     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Use this flag to connect to a
-                                   different JobManager than the one specified
-                                   in the configuration.
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
-
-
-Action "savepoint" triggers savepoints for a running job or disposes existing ones.
-
- Syntax: savepoint [OPTIONS] <Job ID>
- "savepoint" action options:
-    -d,--dispose <arg>            Path of savepoint to dispose.
-    -j,--jarfile <jarfile>        Flink program JAR file.
-    -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                  to connect. Use this flag to connect to a
-                                  different JobManager than the one specified
-                                  in the configuration.
- Options for yarn-cluster mode:
-    -yid,--yarnapplicationId <arg>   Attach to running YARN session
-~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/apis/cluster_execution.md b/docs/apis/cluster_execution.md
deleted file mode 100644
index 79501db..0000000
--- a/docs/apis/cluster_execution.md
+++ /dev/null
@@ -1,156 +0,0 @@
----
-title:  "Cluster Execution"
-# Top-level navigation
-top-nav-group: apis
-top-nav-pos: 8
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-Flink programs can run distributed on clusters of many machines. There
-are two ways to send a program to a cluster for execution:
-
-## Command Line Interface
-
-The command line interface lets you submit packaged programs (JARs) to a cluster
-(or single machine setup).
-
-Please refer to the [Command Line Interface](cli.html) documentation for
-details.
-
-## Remote Environment
-
-The remote environment lets you execute Flink Java programs on a cluster
-directly. The remote environment points to the cluster on which you want to
-execute the program.
-
-### Maven Dependency
-
-If you are developing your program as a Maven project, you have to add the
-`flink-clients` module using this dependency:
-
-~~~xml
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
-</dependency>
-~~~
-
-### Example
-
-The following illustrates the use of the `RemoteEnvironment`:
-
-~~~java
-public static void main(String[] args) throws Exception {
-    ExecutionEnvironment env = ExecutionEnvironment
-        .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
-
-    DataSet<String> data = env.readTextFile("hdfs://path/to/file");
-
-    data
-        .filter(new FilterFunction<String>() {
-            public boolean filter(String value) {
-                return value.startsWith("http://");
-            }
-        })
-        .writeAsText("hdfs://path/to/result");
-
-    env.execute();
-}
-~~~
-
-Note that the program contains custom user code and hence requires a JAR file with
-the classes of the code attached. The constructor of the remote environment
-takes the path(s) to the JAR file(s).
-
-## Linking with modules not contained in the binary distribution
-
-The binary distribution contains jar packages in the `lib` folder that are automatically
-provided to the classpath of your distributed programs. Almost all of Flink classes are
-located there with a few exceptions, for example the streaming connectors and some freshly
-added modules. To run code depending on these modules you need to make them accessible
-during runtime, for which we suggest two options:
-
-1. Either copy the required jar files to the `lib` folder onto all of your TaskManagers.
-Note that you have to restart your TaskManagers after this.
-2. Or package them with your code.
-
-The latter version is recommended as it respects the classloader management in Flink.
-
-### Packaging dependencies with your usercode with Maven
-
-To provide these dependencies not included by Flink we suggest two options with Maven.
-
-1. The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies.
-The assembly configuration is straight-forward, but the resulting jar might become bulky. 
-See [maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html) for further information.
-2. The maven unpack plugin unpacks the relevant parts of the dependencies and
-then packages it with your code.
-
-Using the latter approach in order to bundle the Kafka connector, `flink-connector-kafka`
-you would need to add the classes from both the connector and the Kafka API itself. Add
-the following to your plugins section.
-
-~~~xml
-<plugin>
-    <groupId>org.apache.maven.plugins</groupId>
-    <artifactId>maven-dependency-plugin</artifactId>
-    <version>2.9</version>
-    <executions>
-        <execution>
-            <id>unpack</id>
-            <!-- executed just before the package phase -->
-            <phase>prepare-package</phase>
-            <goals>
-                <goal>unpack</goal>
-            </goals>
-            <configuration>
-                <artifactItems>
-                    <!-- For Flink connector classes -->
-                    <artifactItem>
-                        <groupId>org.apache.flink</groupId>
-                        <artifactId>flink-connector-kafka</artifactId>
-                        <version>{{ site.version }}</version>
-                        <type>jar</type>
-                        <overWrite>false</overWrite>
-                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
-                        <includes>org/apache/flink/**</includes>
-                    </artifactItem>
-                    <!-- For Kafka API classes -->
-                    <artifactItem>
-                        <groupId>org.apache.kafka</groupId>
-                        <artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
-                        <version><YOUR_KAFKA_VERSION></version>
-                        <type>jar</type>
-                        <overWrite>false</overWrite>
-                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
-                        <includes>kafka/**</includes>
-                    </artifactItem>
-                </artifactItems>
-            </configuration>
-        </execution>
-    </executions>
-</plugin>
-~~~
-
-Now when running `mvn clean package` the produced jar includes the required dependencies.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/common/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/docs/apis/common/fig/plan_visualizer.png b/docs/apis/common/fig/plan_visualizer.png
deleted file mode 100644
index 85b8c55..0000000
Binary files a/docs/apis/common/fig/plan_visualizer.png and /dev/null differ


Mime
View raw message