flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [42/89] [abbrv] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
Date Thu, 25 Aug 2016 18:48:45 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/metrics.md
----------------------------------------------------------------------
diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md
deleted file mode 100644
index 1cc7a29..0000000
--- a/docs/apis/metrics.md
+++ /dev/null
@@ -1,470 +0,0 @@
----
-title: "Metrics"
-# Top-level navigation
-top-nav-group: apis
-top-nav-pos: 13
-top-nav-title: "Metrics"
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink exposes a metric system that allows gathering and exposing metrics to external systems.
-
-* This will be replaced by the TOC
-{:toc}
-
-## Registering metrics
-
-You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
-This method returns a `MetricGroup` object on which you can create and register new metrics.
-
-### Metric types
-
-Flink supports `Counters`, `Gauges` and `Histograms`.
-
-#### Counter
-
-A `Counter` is used to count something. The current value can be in- or decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
-You can create and register a `Counter` by calling `counter(String name)` on a `MetricGroup`.
-
-{% highlight java %}
-
-public class MyMapper extends RichMapFunction<String, Integer> {
-  private Counter counter;
-
-  @Override
-  public void open(Configuration config) {
-    this.counter = getRuntimeContext()
-      .getMetricGroup()
-      .counter("myCounter");
-  }
-
-  @public Integer map(String value) throws Exception {
-    this.counter.inc();
-  }
-}
-
-{% endhighlight %}
-
-Alternatively you can also use your own `Counter` implementation:
-
-{% highlight java %}
-
-public class MyMapper extends RichMapFunction<String, Integer> {
-  private Counter counter;
-
-  @Override
-  public void open(Configuration config) {
-    this.counter = getRuntimeContext()
-      .getMetricGroup()
-      .counter("myCustomCounter", new CustomCounter());
-  }
-}
-
-{% endhighlight %}
-
-#### Gauge
-
-A `Gauge` provides a value of any type on demand. In order to use a `Gauge` you must first create a class that implements the `org.apache.flink.metrics.Gauge` interface.
-There is no restriction for the type of the returned value.
-You can register a gauge by calling `gauge(String name, Gauge gauge)` on a `MetricGroup`.
-
-{% highlight java %}
-
-public class MyMapper extends RichMapFunction<String, Integer> {
-  private int valueToExpose;
-
-  @Override
-  public void open(Configuration config) {
-    getRuntimeContext()
-      .getMetricGroup()
-      .gauge("MyGauge", new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return valueToExpose;
-        }
-      });
-  }
-}
-
-{% endhighlight %}
-
-Note that reporters will turn the exposed object into a `String`, which means that a meaningful `toString()` implementation is required.
-
-#### Histogram
-
-A `Histogram` measures the distribution of long values.
-You can register one by calling `histogram(String name, Histogram histogram)` on a `MetricGroup`.
-
-{% highlight java %}
-public class MyMapper extends RichMapFunction<Long, Integer> {
-  private Histogram histogram;
-
-  @Override
-  public void open(Configuration config) {
-    this.histogram = getRuntimeContext()
-      .getMetricGroup()
-      .histogram("myHistogram", new MyHistogram());
-  }
-
-  @public Integer map(Long value) throws Exception {
-    this.histogram.update(value);
-  }
-}
-{% endhighlight %}
-
-Flink does not provide a default implementation for `Histogram`, but offers a {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
-To use this wrapper add the following dependency in your `pom.xml`:
-{% highlight xml %}
-<dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-metrics-dropwizard</artifactId>
-      <version>{{site.version}}</version>
-</dependency>
-{% endhighlight %}
-
-You can then register a Codahale/DropWizard histogram like this:
-
-{% highlight java %}
-public class MyMapper extends RichMapFunction<Long, Integer> {
-  private Histogram histogram;
-
-  @Override
-  public void open(Configuration config) {
-    com.codahale.metrics.Histogram histogram =
-      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
-
-    this.histogram = getRuntimeContext()
-      .getMetricGroup()
-      .histogram("myHistogram", new DropWizardHistogramWrapper(histogram));
-  }
-}
-{% endhighlight %}
-
-## Scope
-
-Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope.
-For example, if `A.B` is the sytem scope, `C.D` the user scope and `E` the name, then the identifier for the metric will be `A.B.C.D.E`.
-
-You can configure which delimiter to use for the identifier (default: `.`) by setting the `metrics.scope.delimiter` key in `conf/flink-conf.yaml`.
-
-### User Scope
-
-You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
-
-{% highlight java %}
-
-counter = getRuntimeContext()
-  .getMetricGroup()
-  .addGroup("MyMetrics")
-  .counter("myCounter");
-
-{% endhighlight %}
-
-### System Scope
-
-The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.
-
-Which context information should be included can be configured by setting the following keys in `conf/flink-conf.yaml`.
-Each of these keys expect a format string that may contain constants (e.g. "taskmanager") and variables (e.g. "&lt;task_id&gt;") which will be replaced at runtime.
-
-- `metrics.scope.jm`
-  - Default: &lt;host&gt;.jobmanager
-  - Applied to all metrics that were scoped to a job manager.
-- `metrics.scope.jm.job`
-  - Default: &lt;host&gt;.jobmanager.&lt;job_name&gt;
-  - Applied to all metrics that were scoped to a job manager and job.
-- `metrics.scope.tm`
-  - Default: &lt;host&gt;.taskmanager.&lt;tm_id&gt;
-  - Applied to all metrics that were scoped to a task manager.
-- `metrics.scope.tm.job`
-  - Default: &lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;
-  - Applied to all metrics that were scoped to a task manager and job.
-- `metrics.scope.task`
-  - Default: &lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;task_name&gt;.&lt;subtask_index&gt;
-   - Applied to all metrics that were scoped to a task.
-- `metrics.scope.operator`
-  - Default: &lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;operator_name&gt;.&lt;subtask_index&gt;
-  - Applied to all metrics that were scoped to an operator.
-
-There are no restrictions on the number or order of variables. Variables are case sensitive.
-
-The default scope for operator metrics will result in an identifier akin to `localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric`
-
-If you also want to include the task name but omit the task manager information you can specify the following format:
-
-`metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>`
-
-This could create the identifier `localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric`.
-
-Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data.
-As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g &lt;job_id&gt;)
-or by assigning unique names to jobs and operators.
-
-### List of all Variables
-
-- JobManager: &lt;host&gt;
-- TaskManager: &lt;host&gt;, &lt;tm_id&gt;
-- Job: &lt;job_id&gt;, &lt;job_name&gt;
-- Task: &lt;task_id&gt;, &lt;task_name&gt;, &lt;task_attempt_id&gt;, &lt;task_attempt_num&gt;, &lt;subtask_index&gt;
-- Operator: &lt;operator_name&gt;, &lt;subtask_index&gt;
-
-## Reporter
-
-Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`.
-
-- `metrics.reporters`: The list of named reporters.
-- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
-- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
-- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
-
-All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
-we will list more settings specific to each reporter.
-
-Example reporter configuration that specifies multiple reporters:
-
-```
-metrics.reporters: my_jmx_reporter,my_other_reporter
-
-metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
-metrics.reporter.my_jmx_reporter.port: 9020-9040
-
-metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
-metrics.reporter.my_other_reporter.host: 192.168.1.1
-metrics.reporter.my_other_reporter.port: 10000
-
-```
-
-You can write your own `Reporter` by implementing the `org.apache.flink.metrics.reporter.MetricReporter` interface.
-If the Reporter should send out reports regularly you have to implement the `Scheduled` interface as well.
-
-The following sections list the supported reporters.
-
-### JMX (org.apache.flink.metrics.jmx.JMXReporter)
-
-You don't have to include an additional dependency since the JMX reporter is available by default
-but not activated.
-
-Parameters:
-
-- `port` - the port on which JMX listens for connections. This can also be a port range. When a
-range is specified the actual port is shown in the relevant job or task manager log. If you don't
-specify a port no extra JMX server will be started. Metrics are still available on the default
-local JMX interface.
-
-### Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)
-Dependency:
-{% highlight xml %}
-<dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-metrics-ganglia</artifactId>
-      <version>{{site.version}}</version>
-</dependency>
-{% endhighlight %}
-
-Parameters:
-
-- `host` - the gmond host address configured under `udp_recv_channel.bind` in `gmond.conf`
-- `port` - the gmond port configured under `udp_recv_channel.port` in `gmond.conf`
-- `tmax` - soft limit for how long an old metric should be retained
-- `dmax` - hard limit for how long an old metric should be retained
-- `ttl` - time-to-live for transmitted UDP packets
-- `addressingMode` - UDP addressing mode to use (UNICAST/MULTICAST)
-
-### Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)
-Dependency:
-{% highlight xml %}
-<dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-metrics-graphite</artifactId>
-      <version>{{site.version}}</version>
-</dependency>
-{% endhighlight %}
-
-Parameters:
-
-- `host` - the Graphite server host
-- `port` - the Graphite server port
-
-### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
-Dependency:
-{% highlight xml %}
-<dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-metrics-statsd</artifactId>
-      <version>{{site.version}}</version>
-</dependency>
-{% endhighlight %}
-
-Parameters:
-
-- `host` - the StatsD server host
-- `port` - the StatsD server port
-
-## System metrics
-
-Flink exposes the following system metrics:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Scope</th>
-      <th class="text-left">Metrics</th>
-      <th class="text-left">Description</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <th rowspan="1"><strong>JobManager</strong></th>
-      <td></td>
-      <td></td>
-    </tr>
-    <tr>
-      <th rowspan="19"><strong>TaskManager.Status.JVM</strong></th>
-      <td>ClassLoader.ClassesLoaded</td>
-      <td>The total number of classes loaded since the start of the JVM.</td>
-    </tr>
-    <tr>
-      <td>ClassLoader.ClassesUnloaded</td>
-      <td>The total number of classes unloaded since the start of the JVM.</td>
-    </tr>
-    <tr>
-      <td>GargabeCollector.&lt;garbageCollector&gt;.Count</td>
-      <td>The total number of collections that have occurred.</td>
-    </tr>
-    <tr>
-      <td>GargabeCollector.&lt;garbageCollector&gt;.Time</td>
-      <td>The total time spent performing garbage collection.</td>
-    </tr>
-    <tr>
-      <td>Memory.Heap.Used</td>
-      <td>The amount of heap memory currently used.</td>
-    </tr>
-    <tr>
-      <td>Memory.Heap.Committed</td>
-      <td>The amount of heap memory guaranteed to be available to the JVM.</td>
-    </tr>
-    <tr>
-      <td>Memory.Heap.Max</td>
-      <td>The maximum amount of heap memory that can be used for memory management.</td>
-    </tr>
-    <tr>
-      <td>Memory.NonHeap.Used</td>
-      <td>The amount of non-heap memory currently used.</td>
-    </tr>
-    <tr>
-      <td>Memory.NonHeap.Committed</td>
-      <td>The amount of non-heap memory guaranteed to be available to the JVM.</td>
-    </tr>
-    <tr>
-      <td>Memory.NonHeap.Max</td>
-      <td>The maximum amount of non-heap memory that can be used for memory management.</td>
-    </tr>
-    <tr>
-      <td>Memory.Direct.Count</td>
-      <td>The number of buffers in the direct buffer pool.</td>
-    </tr>
-    <tr>
-      <td>Memory.Direct.MemoryUsed</td>
-      <td>The amount of memory used by the JVM for the direct buffer pool.</td>
-    </tr>
-    <tr>
-      <td>Memory.Direct.TotalCapacity</td>
-      <td>The total capacity of all buffers in the direct buffer pool.</td>
-    </tr>
-    <tr>
-      <td>Memory.Mapped.Count</td>
-      <td>The number of buffers in the mapped buffer pool.</td>
-    </tr>
-    <tr>
-      <td>Memory.Mapped.MemoryUsed</td>
-      <td>The amount of memory used by the JVM for the mapped buffer pool.</td>
-    </tr>
-    <tr>
-      <td>Memory.Mapped.TotalCapacity</td>
-      <td>The number of buffers in the mapped buffer pool.</td>
-    </tr>
-    <tr>
-      <td>Threads.Count</td>
-      <td>The total number of live threads.</td>
-    </tr>
-    <tr>
-      <td>CPU.Load</td>
-      <td>The recent CPU usage of the JVM.</td>
-    </tr>
-    <tr>
-      <td>CPU.Time</td>
-      <td>The CPU time used by the JVM.</td>
-    </tr>
-    <tr>
-      <th rowspan="1"><strong>Job</strong></th>
-      <td></td>
-      <td></td>
-    </tr>
-    <tr>
-      <tr>
-        <th rowspan="7"><strong>Task</strong></t>
-        <td>currentLowWatermark</td>
-        <td>The lowest watermark a task has received.</td>
-      </tr>
-      <tr>
-        <td>lastCheckpointDuration</td>
-        <td>The time it took to complete the last checkpoint.</td>
-      </tr>
-      <tr>
-        <td>lastCheckpointSize</td>
-        <td>The total size of the last checkpoint.</td>
-      </tr>
-      <tr>
-        <td>restartingTime</td>
-        <td>The time it took to restart the job.</td>
-      </tr>
-      <tr>
-        <td>numBytesInLocal</td>
-        <td>The total number of bytes this task has read from a local source.</td>
-      </tr>
-      <tr>
-        <td>numBytesInRemote</td>
-        <td>The total number of bytes this task has read from a remote source.</td>
-      </tr>
-      <tr>
-        <td>numBytesOut</td>
-        <td>The total number of bytes this task has emitted.</td>
-      </tr>
-    </tr>
-    <tr>
-      <tr>
-        <th rowspan="3"><strong>Operator</strong></th>
-        <td>numRecordsIn</td>
-        <td>The total number of records this operator has received.</td>
-      </tr>
-      <tr>
-        <td>numRecordsOut</td>
-        <td>The total number of records this operator has emitted.</td>
-      </tr>
-      <tr>
-        <td>numSplitsProcessed</td>
-        <td>The total number of InputSplits this data source has processed.</td>
-      </tr>
-    </tr>
-  </tbody>
-</table>
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
deleted file mode 100644
index 0d865fe..0000000
--- a/docs/apis/programming_guide.md
+++ /dev/null
@@ -1,26 +0,0 @@
----
-title: DataSet API
----
-
-<meta http-equiv="refresh" content="1; url={{ site.baseurl }}/apis/batch/index.html" />
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-The *DataSet API guide* has been moved. Redirecting to [{{ site.baseurl }}/apis/batch/index.html]({{ site.baseurl }}/apis/batch/index.html) in 1 second.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_api_extensions.md b/docs/apis/scala_api_extensions.md
deleted file mode 100644
index e3268bf..0000000
--- a/docs/apis/scala_api_extensions.md
+++ /dev/null
@@ -1,409 +0,0 @@
----
-title: "Scala API Extensions"
-# Top-level navigation
-top-nav-group: apis
-top-nav-pos: 11
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-In order to keep a fair amount of consistency between the Scala and Java APIs, some 
-of the features that allow a high-level of expressiveness in Scala have been left
-out from the standard APIs for both batch and streaming.
-
-If you want to _enjoy the full Scala experience_ you can choose to opt-in to 
-extensions that enhance the Scala API via implicit conversions.
-
-To use all the available extensions, you can just add a simple `import` for the
-DataSet API
-
-{% highlight scala %}
-import org.apache.flink.api.scala.extensions._
-{% endhighlight %}
-
-or the DataStream API
-
-{% highlight scala %}
-import org.apache.flink.streaming.api.scala.extensions._
-{% endhighlight %}
-
-Alternatively, you can import individual extensions _a-là-carte_ to only use those
-you prefer.
-
-## Accept partial functions
-
-Normally, both the DataSet and DataStream APIs don't accept anonymous pattern
-matching functions to deconstruct tuples, case classes or collections, like the
-following:
-
-{% highlight scala %}
-val data: DataSet[(Int, String, Double)] = // [...]
-data.map {
-  case (id, name, temperature) => // [...]
-  // The previous line causes the following compilation error:
-  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
-}
-{% endhighlight %}
-
-This extension introduces new methods in both the DataSet and DataStream Scala API
-that have a one-to-one correspondance in the extended API. These delegating methods 
-do support anonymous pattern matching functions.
-
-#### DataSet API
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Method</th>
-      <th class="text-left" style="width: 20%">Original</th>
-      <th class="text-center">Example</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <td><strong>mapWith</strong></td>
-      <td><strong>map (DataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.mapWith {
-  case (_, value) => value.toString
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>mapPartitionWith</strong></td>
-      <td><strong>mapPartition (DataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.mapPartitionWith {
-  case head #:: _ => head
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>flatMapWith</strong></td>
-      <td><strong>flatMap (DataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.flatMapWith {
-  case (_, name, visitTimes) => visitTimes.map(name -> _)
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>filterWith</strong></td>
-      <td><strong>filter (DataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.filterWith {
-  case Train(_, isOnTime) => isOnTime
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>reduceWith</strong></td>
-      <td><strong>reduce (DataSet, GroupedDataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.reduceWith {
-  case ((_, amount1), (_, amount2)) => amount1 + amount2
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>reduceGroupWith</strong></td>
-      <td><strong>reduceGroup (GroupedDataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.reduceGroupWith {
-  case id #:: value #:: _ => id -> value
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>groupingBy</strong></td>
-      <td><strong>groupBy (DataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data.groupingBy {
-  case (id, _, _) => id
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>sortGroupWith</strong></td>
-      <td><strong>sortGroup (GroupedDataSet)</strong></td>
-      <td>
-{% highlight scala %}
-grouped.sortGroupWith(Order.ASCENDING) {
-  case House(_, value) => value
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>combineGroupWith</strong></td>
-      <td><strong>combineGroup (GroupedDataSet)</strong></td>
-      <td>
-{% highlight scala %}
-grouped.combineGroupWith {
-  case header #:: amounts => amounts.sum
-}
-{% endhighlight %}
-      </td>
-    <tr>
-      <td><strong>projecting</strong></td>
-      <td><strong>apply (JoinDataSet, CrossDataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data1.join(data2).
-  whereClause(case (pk, _) => pk).
-  isEqualTo(case (_, fk) => fk).
-  projecting {
-    case ((pk, tx), (products, fk)) => tx -> products
-  }
-
-data1.cross(data2).projecting {
-  case ((a, _), (_, b) => a -> b
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>projecting</strong></td>
-      <td><strong>apply (CoGroupDataSet)</strong></td>
-      <td>
-{% highlight scala %}
-data1.coGroup(data2).
-  whereClause(case (pk, _) => pk).
-  isEqualTo(case (_, fk) => fk).
-  projecting {
-    case (head1 #:: _, head2 #:: _) => head1 -> head2
-  }
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    </tr>
-  </tbody>
-</table>
-
-#### DataStream API
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Method</th>
-      <th class="text-left" style="width: 20%">Original</th>
-      <th class="text-center">Example</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <td><strong>mapWith</strong></td>
-      <td><strong>map (DataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.mapWith {
-  case (_, value) => value.toString
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>mapPartitionWith</strong></td>
-      <td><strong>mapPartition (DataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.mapPartitionWith {
-  case head #:: _ => head
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>flatMapWith</strong></td>
-      <td><strong>flatMap (DataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.flatMapWith {
-  case (_, name, visits) => visits.map(name -> _)
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>filterWith</strong></td>
-      <td><strong>filter (DataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.filterWith {
-  case Train(_, isOnTime) => isOnTime
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>keyingBy</strong></td>
-      <td><strong>keyBy (DataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.keyingBy {
-  case (id, _, _) => id
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>mapWith</strong></td>
-      <td><strong>map (ConnectedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.mapWith(
-  map1 = case (_, value) => value.toString,
-  map2 = case (_, _, value, _) => value + 1
-)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>flatMapWith</strong></td>
-      <td><strong>flatMap (ConnectedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.flatMapWith(
-  flatMap1 = case (_, json) => parse(json),
-  flatMap2 = case (_, _, json, _) => parse(json)
-)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>keyingBy</strong></td>
-      <td><strong>keyBy (ConnectedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.keyingBy(
-  key1 = case (_, timestamp) => timestamp,
-  key2 = case (id, _, _) => id
-)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>reduceWith</strong></td>
-      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.reduceWith {
-  case ((_, sum1), (_, sum2) => sum1 + sum2
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>foldWith</strong></td>
-      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.foldWith(User(bought = 0)) {
-  case (User(b), (_, items)) => User(b + items.size)
-}
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>applyWith</strong></td>
-      <td><strong>apply (WindowedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data.applyWith(0)(
-  foldFunction = case (sum, amount) => sum + amount
-  windowFunction = case (k, w, sum) => // [...]
-)
-{% endhighlight %}
-      </td>
-    </tr>
-    <tr>
-      <td><strong>projecting</strong></td>
-      <td><strong>apply (JoinedDataStream)</strong></td>
-      <td>
-{% highlight scala %}
-data1.join(data2).
-  whereClause(case (pk, _) => pk).
-  isEqualTo(case (_, fk) => fk).
-  projecting {
-    case ((pk, tx), (products, fk)) => tx -> products
-  }
-{% endhighlight %}
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-
-
-For more information on the semantics of each method, please refer to the 
-[DataStream](batch/index.html) and [DataSet](streaming/index.html) API documentation.
-
-To use this extension exclusively, you can add the following `import`:
-
-{% highlight scala %}
-import org.apache.flink.api.scala.extensions.acceptPartialFunctions
-{% endhighlight %}
-
-for the DataSet extensions and
-
-{% highlight scala %}
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-{% endhighlight %}
-
-The following snippet shows a minimal example of how to use these extension
-methods together (with the DataSet API):
-
-{% highlight scala %}
-object Main {
-  import org.apache.flink.api.scala.extensions._
-  case class Point(x: Double, y: Double)
-  def main(args: Array[String]): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
-    ds.filterWith {
-      case Point(x, _) => x > 1
-    }.reduceWith {
-      case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
-    }.mapWith {
-      case Point(x, y) => (x, y)
-    }.flatMapWith {
-      case (x, y) => Seq("x" -> x, "y" -> y)
-    }.groupingBy {
-      case (id, value) => id
-    }
-  }
-}
-{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md
deleted file mode 100644
index ad36ca0..0000000
--- a/docs/apis/scala_shell.md
+++ /dev/null
@@ -1,197 +0,0 @@
----
-title: "Scala Shell"
-# Top-level navigation
-top-nav-group: apis
-top-nav-pos: 10
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-
-Flink comes with an integrated interactive Scala Shell.
-It can be used in a local setup as well as in a cluster setup.
-
-
-To use the shell with an integrated Flink cluster just execute:
-
-~~~bash
-bin/start-scala-shell.sh local
-~~~
-
-in the root directory of your binary Flink directory. To run the Shell on a
-cluster, please see the Setup section below.
-
-
-## Usage
-
-The shell supports Batch and Streaming.
-Two different ExecutionEnvironments are automatically prebound after startup.
-Use "benv" and "senv" to access the Batch and Streaming environment respectively.
-
-### DataSet API
-
-The following example will execute the wordcount program in the Scala shell:
-
-~~~scala
-Scala-Flink> val text = benv.fromElements(
-  "To be, or not to be,--that is the question:--",
-  "Whether 'tis nobler in the mind to suffer",
-  "The slings and arrows of outrageous fortune",
-  "Or to take arms against a sea of troubles,")
-Scala-Flink> val counts = text
-    .flatMap { _.toLowerCase.split("\\W+") }
-    .map { (_, 1) }.groupBy(0).sum(1)
-Scala-Flink> counts.print()
-~~~
-
-The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
-
-It is possible to write results to a file. However, in this case you need to call `execute`, to run your program:
-
-~~~scala
-Scala-Flink> benv.execute("MyProgram")
-~~~
-
-### DataStream API
-
-Similar to the the batch program above, we can execute a streaming program through the DataStream API:
-
-~~~scala
-Scala-Flink> val textStreaming = senv.fromElements(
-  "To be, or not to be,--that is the question:--",
-  "Whether 'tis nobler in the mind to suffer",
-  "The slings and arrows of outrageous fortune",
-  "Or to take arms against a sea of troubles,")
-Scala-Flink> val countsStreaming = textStreaming
-    .flatMap { _.toLowerCase.split("\\W+") }
-    .map { (_, 1) }.keyBy(0).sum(1)
-Scala-Flink> countsStreaming.print()
-Scala-Flink> senv.execute("Streaming Wordcount")
-~~~
-
-Note, that in the Streaming case, the print operation does not trigger execution directly.
-
-The Flink Shell comes with command history and auto-completion.
-
-
-## Adding external dependencies
-
-It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
-
-Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
-
-~~~bash
-bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
-~~~
-
-
-## Setup
-
-To get an overview of what options the Scala Shell provides, please use
-
-~~~bash
-bin/start-scala-shell.sh --help
-~~~
-
-### Local
-
-To use the shell with an integrated Flink cluster just execute:
-
-~~~bash
-bin/start-scala-shell.sh local
-~~~
-
-
-### Remote
-
-To use it with a running cluster start the scala shell with the keyword `remote`
-and supply the host and port of the JobManager with:
-
-~~~bash
-bin/start-scala-shell.sh remote <hostname> <portnumber>
-~~~
-
-### Yarn Scala Shell cluster
-
-The shell can deploy a Flink cluster to YARN, which is used exclusively by the
-shell. The number of YARN containers can be controlled by the parameter `-n <arg>`.
-The shell deploys a new Flink cluster on YARN and connects the
-cluster. You can also specify options for YARN cluster such as memory for
-JobManager, name of YARN application, etc.
- 
-For example, to start a Yarn cluster for the Scala Shell with two TaskManagers
-use the following:
- 
-~~~bash
- bin/start-scala-shell.sh yarn -n 2
-~~~
-
-For all other options, see the full reference at the bottom.
-
-
-### Yarn Session
-
-If you have previously deployed a Flink cluster using the Flink Yarn Session,
-the Scala shell can connect with it using the following command:
-
-~~~bash
- bin/start-scala-shell.sh yarn
-~~~
-
-
-## Full Reference
-
-~~~bash
-Flink Scala Shell
-Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
-
-Command: local [options]
-Starts Flink scala shell with a local Flink cluster
-  -a <path/to/jar> | --addclasspath <path/to/jar>
-        Specifies additional jars to be used in Flink
-Command: remote [options] <host> <port>
-Starts Flink scala shell connecting to a remote cluster
-  <host>
-        Remote host name as string
-  <port>
-        Remote port as integer
-
-  -a <path/to/jar> | --addclasspath <path/to/jar>
-        Specifies additional jars to be used in Flink
-Command: yarn [options]
-Starts Flink scala shell connecting to a yarn cluster
-  -n arg | --container arg
-        Number of YARN container to allocate (= Number of TaskManagers)
-  -jm arg | --jobManagerMemory arg
-        Memory for JobManager container [in MB]
-  -nm <value> | --name <value>
-        Set a custom name for the application on YARN
-  -qu <arg> | --queue <arg>
-        Specifies YARN queue
-  -s <arg> | --slots <arg>
-        Number of slots per TaskManager
-  -tm <arg> | --taskManagerMemory <arg>
-        Memory per TaskManager container [in MB]
-  -a <path/to/jar> | --addclasspath <path/to/jar>
-        Specifies additional jars to be used in Flink
-  --configDir <value>
-        The configuration directory.
-  -h | --help
-        Prints this usage text
-~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/cassandra.md b/docs/apis/streaming/connectors/cassandra.md
deleted file mode 100644
index 28ad244..0000000
--- a/docs/apis/streaming/connectors/cassandra.md
+++ /dev/null
@@ -1,158 +0,0 @@
----
-title: "Apache Cassandra Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 1
-sub-nav-title: Cassandra
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides sinks that writes data into a [Cassandra](https://cassandra.apache.org/) database.
-
-To use this connector, add the following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-cassandra{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Installing Apache Cassandra
-Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted).
-
-#### Cassandra Sink
-
-Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
-This method returns a CassandraSinkBuilder, which offers methods to further configure the sink.
-
-The following configuration methods can be used:
-
-1. setQuery(String query)
-2. setHost(String host[, int port])
-3. setClusterBuilder(ClusterBuilder builder)
-4. enableWriteAheadLog([CheckpointCommitter committer])
-5. build()
-
-*setQuery()* sets the query that is executed for every value the sink receives.
-*setHost()* sets the cassandra host/port to connect to. This method is intended for simple use-cases.
-*setClusterBuilder()* sets the cluster builder that is used to configure the connection to cassandra. The *setHost()* functionality can be subsumed with this method.
-*enableWriteAheadLog()* is an optional method, that allows exactly-once processing for non-deterministic algorithms.
-
-A checkpoint committer stores additional information about completed checkpoints
-in some resource. This information is used to prevent a full replay of the last
-completed checkpoint in case of a failure.
-You can use a `CassandraCommitter` to store these in a separate table in cassandra.
-Note that this table will NOT be cleaned up by Flink.
-
-*build()* finalizes the configuration and returns the CassandraSink.
-
-Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
-times without changing the result) and checkpointing is enabled. In case of a failure the failed
-checkpoint will be replayed completely.
-
-Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
-the replayed checkpoint may be completely different than the previous attempt, which may leave the
-database in an inconsitent state since part of the first attempt may already be written.
-The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. 
-Note that that enabling this feature will have an adverse impact on latency.
-
-<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list.</p>
-
-
-#### Example
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-    @Override
-    public Cluster buildCluster(Cluster.Builder builder) {
-      return builder.addContactPoint("127.0.0.1").build();
-    }
-  })
-  .build();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-    @Override
-    public Cluster buildCluster(Cluster.Builder builder) {
-      return builder.addContactPoint("127.0.0.1").build();
-    }
-  })
-  .build();
-{% endhighlight %}
-</div>
-</div>
-
-The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
-Flink automatically detects which type of input is used.
-
-Example for such a Pojo:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-@Table(keyspace= "test", name = "mappersink")
-public class Pojo implements Serializable {
-
-	private static final long serialVersionUID = 1038054554690916991L;
-
-	@Column(name = "id")
-	private long id;
-	@Column(name = "value")
-	private String value;
-
-	public Pojo(long id, String value){
-		this.id = id;
-		this.value = value;
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public void setId(long id) {
-		this.id = id;
-	}
-
-	public String getValue() {
-		return value;
-	}
-
-	public void setValue(String value) {
-		this.value = value;
-	}
-}
-{% endhighlight %}
-</div>
-</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/elasticsearch.md b/docs/apis/streaming/connectors/elasticsearch.md
deleted file mode 100644
index 93b2bf6..0000000
--- a/docs/apis/streaming/connectors/elasticsearch.md
+++ /dev/null
@@ -1,183 +0,0 @@
----
-title: "Elasticsearch Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 2
-sub-nav-title: Elasticsearch
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to an
-[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Elasticsearch
-
-Instructions for setting up an Elasticsearch cluster can be found
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
-#### Elasticsearch Sink
-The connector provides a Sink that can send data to an Elasticsearch Index.
-
-The sink can use two different methods for communicating with Elasticsearch:
-
-1. An embedded Node
-2. The TransportClient
-
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
-for information about the differences between the two modes.
-
-This code shows how to create a sink that uses an embedded Node for
-communication:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<String> input = ...;
-
-Map<String, String> config = Maps.newHashMap();
-// This instructs the sink to emit after every element, otherwise they would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
-    @Override
-    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-        Map<String, Object> json = new HashMap<>();
-        json.put("data", element);
-
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
-  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    println("SENDING: " + element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-Note how a Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name` parameter that must correspond to
-the name of your cluster.
-
-Internally, the sink uses a `BulkProcessor` to send index requests to the cluster.
-This will buffer elements before sending a request to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
-
-This example code does the same, but with a `TransportClient`:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<String> input = ...;
-
-Map<String, String> config = Maps.newHashMap();
-// This instructs the sink to emit after every element, otherwise they would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-List<TransportAddress> transports = new ArrayList<String>();
-transports.add(new InetSocketTransportAddress("node-1", 9300));
-transports.add(new InetSocketTransportAddress("node-2", 9300));
-
-input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {
-    @Override
-    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
-        Map<String, Object> json = new HashMap<>();
-        json.put("data", element);
-
-        return Requests.indexRequest()
-                .index("my-index")
-                .type("my-type")
-                .source(json);
-    }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketTransportAddress("node-1", 9300))
-transports.add(new InetSocketTransportAddress("node-2", 9300))
-
-text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {
-  override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    println("SENDING: " + element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-The difference is that we now need to provide a list of Elasticsearch Nodes
-to which the sink should connect using a `TransportClient`.
-
-More information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/elasticsearch2.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/elasticsearch2.md b/docs/apis/streaming/connectors/elasticsearch2.md
deleted file mode 100644
index 36d0920..0000000
--- a/docs/apis/streaming/connectors/elasticsearch2.md
+++ /dev/null
@@ -1,144 +0,0 @@
----
-title: "Elasticsearch 2.x Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 2
-sub-nav-title: Elasticsearch 2.x
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to an
-[Elasticsearch 2.x](https://elastic.co/) Index. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Elasticsearch 2.x
-
-Instructions for setting up an Elasticsearch cluster can be found
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
-Make sure to set and remember a cluster name. This must be set when
-creating a Sink for writing to your cluster
-
-#### Elasticsearch 2.x Sink
-The connector provides a Sink that can send data to an Elasticsearch 2.x Index.
-
-The sink communicates with Elasticsearch via Transport Client
-
-See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html)
-for information about the Transport Client.
-
-The code below shows how to create a sink that uses a `TransportClient` for communication:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-File dataDir = ....;
-
-DataStream<String> input = ...;
-
-Map<String, String> config = new HashMap<>();
-// This instructs the sink to emit after every element, otherwise they would be buffered
-config.put("bulk.flush.max.actions", "1");
-config.put("cluster.name", "my-cluster-name");
-
-List<InetSocketAddress> transports = new ArrayList<>();
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>() {
-  public IndexRequest createIndexRequest(String element) {
-    Map<String, String> json = new HashMap<>();
-    json.put("data", element);
-
-    return Requests.indexRequest()
-            .index("my-index")
-            .type("my-type")
-            .source(json);
-  }
-
-  @Override
-  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
-    indexer.add(createIndexRequest(element));
-  }
-}));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val dataDir = ....;
-
-val input: DataStream[String] = ...
-
-val config = new util.HashMap[String, String]
-config.put("bulk.flush.max.actions", "1")
-config.put("cluster.name", "my-cluster-name")
-
-val transports = new ArrayList[String]
-transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
-transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
-
-input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
-  def createIndexRequest(element: String): IndexRequest = {
-    val json = new util.HashMap[String, AnyRef]
-    json.put("data", element)
-    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
-  }
-
-  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
-    indexer.add(createIndexRequest(element))
-  }
-}))
-{% endhighlight %}
-</div>
-</div>
-
-A Map of Strings is used to configure the Sink. The configuration keys
-are documented in the Elasticsearch documentation
-[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
-Especially important is the `cluster.name`. parameter that must correspond to
-the name of your cluster and with ElasticSearch 2x you also need to specify `path.home`.
-
-Internally, the sink uses a `BulkProcessor` to send Action requests to the cluster.
-This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
-`BulkProcessor` can be configured using these config keys:
- * **bulk.flush.max.actions**: Maximum amount of elements to buffer
- * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
- * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
-  settings in milliseconds
-
-This now provides a list of Elasticsearch Nodes
-to which the sink should connect via a `TransportClient`.
-
-More information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/filesystem_sink.md b/docs/apis/streaming/connectors/filesystem_sink.md
deleted file mode 100644
index f2dc012..0000000
--- a/docs/apis/streaming/connectors/filesystem_sink.md
+++ /dev/null
@@ -1,133 +0,0 @@
----
-title: "HDFS Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 3
-sub-nav-title: Filesystem Sink
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that writes rolling files to any filesystem supported by
-Hadoop FileSystem. To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-filesystem{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version}}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Rolling File Sink
-
-The rolling behaviour as well as the writing can be configured but we will get to that later.
-This is how you can create a default rolling sink:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<String> input = ...;
-
-input.addSink(new RollingSink<String>("/base/path"));
-
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[String] = ...
-
-input.addSink(new RollingSink("/base/path"))
-
-{% endhighlight %}
-</div>
-</div>
-
-The only required parameter is the base path where the rolling files (buckets) will be
-stored. The sink can be configured by specifying a custom bucketer, writer and batch size.
-
-By default the rolling sink will use the pattern `"yyyy-MM-dd--HH"` to name the rolling buckets.
-This pattern is passed to `SimpleDateFormat` with the current system time to form a bucket path. A
-new bucket will be created whenever the bucket path changes. For example, if you have a pattern
-that contains minutes as the finest granularity you will get a new bucket every minute.
-Each bucket is itself a directory that contains several part files: Each parallel instance
-of the sink will create its own part file and when part files get too big the sink will also
-create a new part file next to the others. To specify a custom bucketer use `setBucketer()`
-on a `RollingSink`.
-
-The default writer is `StringWriter`. This will call `toString()` on the incoming elements
-and write them to part files, separated by newline. To specify a custom writer use `setWriter()`
-on a `RollingSink`. If you want to write Hadoop SequenceFiles you can use the provided
-`SequenceFileWriter` which can also be configured to use compression.
-
-The last configuration option is the batch size. This specifies when a part file should be closed
-and a new one started. (The default part file size is 384 MB).
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<IntWritable,Text>> input = ...;
-
-RollingSink sink = new RollingSink<String>("/base/path");
-sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
-sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
-sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
-
-input.addSink(sink);
-
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
-
-val sink = new RollingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
-sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
-
-input.addSink(sink)
-
-{% endhighlight %}
-</div>
-</div>
-
-This will create a sink that writes to bucket files that follow this schema:
-
-```
-/base/path/{date-time}/part-{parallel-task}-{count}
-```
-
-Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index
-of the parallel sink instance and `count` is the running number of part files that where created
-because of the batch size.
-
-For in-depth information, please refer to the JavaDoc for
-[RollingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md
deleted file mode 100644
index 83ca514..0000000
--- a/docs/apis/streaming/connectors/index.md
+++ /dev/null
@@ -1,47 +0,0 @@
----
-title: "Streaming Connectors"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-id: connectors
-sub-nav-pos: 6
-sub-nav-title: Connectors
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Connectors provide code for interfacing with various third-party systems.
-
-Currently these systems are supported:
-
- * [Apache Kafka](https://kafka.apache.org/) (sink/source)
- * [Elasticsearch](https://elastic.co/) (sink)
- * [Elasticsearch 2x](https://elastic.com) (sink)
- * [Hadoop FileSystem](http://hadoop.apache.org) (sink)
- * [RabbitMQ](http://www.rabbitmq.com/) (sink/source)
- * [Amazon Kinesis Streams](http://aws.amazon.com/kinesis/streams/) (sink/source)
- * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
- * [Apache NiFi](https://nifi.apache.org) (sink/source)
- * [Apache Cassandra](https://cassandra.apache.org/) (sink)
- * [Redis](http://redis.io/) (sink)
-
-To run an application using one of these connectors, additional third party
-components are usually required to be installed and launched, e.g. the servers
-for the message queues. Further instructions for these can be found in the
-corresponding subsections.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md
deleted file mode 100644
index e7cd05b..0000000
--- a/docs/apis/streaming/connectors/kafka.md
+++ /dev/null
@@ -1,293 +0,0 @@
----
-title: "Apache Kafka Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 1
-sub-nav-title: Kafka
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/).
-
-Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide
-exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group
-offset tracking, but tracks and checkpoints these offsets internally as well.
-
-Please pick a package (maven artifact id) and class name for your use-case and environment.
-For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is appropriate.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">Maven Dependency</th>
-      <th class="text-left">Supported since</th>
-      <th class="text-left">Consumer and <br>
-      Producer Class name</th>
-      <th class="text-left">Kafka version</th>
-      <th class="text-left">Notes</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>flink-connector-kafka</td>
-        <td>0.9.1, 0.10</td>
-        <td>FlinkKafkaConsumer082<br>
-        FlinkKafkaProducer</td>
-        <td>0.8.x</td>
-        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
-    </tr>
-     <tr>
-        <td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td>
-        <td>1.0.0</td>
-        <td>FlinkKafkaConsumer08<br>
-        FlinkKafkaProducer08</td>
-        <td>0.8.x</td>
-        <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
-    </tr>
-     <tr>
-        <td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td>
-        <td>1.0.0</td>
-        <td>FlinkKafkaConsumer09<br>
-        FlinkKafkaProducer09</td>
-        <td>0.9.x</td>
-        <td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td>
-    </tr>
-  </tbody>
-</table>
-
-Then, import the connector in your maven project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-### Installing Apache Kafka
-
-* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
-* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
-* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
-
-### Kafka Consumer
-
-Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 0.9.0.x versions). It provides access to one or more Kafka topics.
-
-The constructor accepts the following arguments:
-
-1. The topic name / list of topic names
-2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka
-3. Properties for the Kafka consumer.
-  The following properties are required:
-  - "bootstrap.servers" (comma separated list of Kafka brokers)
-  - "zookeeper.connect" (comma separated list of Zookeeper servers) (**only required for Kafka 0.8**)
-  - "group.id" the id of the consumer group
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
-// only required for Kafka 0.8
-properties.setProperty("zookeeper.connect", "localhost:2181");
-properties.setProperty("group.id", "test");
-DataStream<String> stream = env
-	.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties))
-	.print();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
-// only required for Kafka 0.8
-properties.setProperty("zookeeper.connect", "localhost:2181");
-properties.setProperty("group.id", "test");
-stream = env
-    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
-    .print
-{% endhighlight %}
-</div>
-</div>
-
-The current FlinkKafkaConsumer implementation will establish a connection from the client (when calling the constructor)
-for querying the list of topics and partitions.
-
-For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster.
-If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc.
-
-#### The `DeserializationSchema`
-
-The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The 
-`DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
-method gets called for each Kafka message, passing the value from Kafka.
-
-It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the
-produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need
-to implement the `getProducedType(...)` method themselves.
-
-For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has
-the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`.
-
-For convenience, Flink provides the following schemas:
-
-1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) which creates 
-    a schema based on a Flink's `TypeInformation`. This is useful if the data is both written and read by Flink.
-    This schema is a performant Flink-specific alternative to other generic serialization approaches.
- 
-2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which turns the serialized JSON 
-    into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/...)(). 
-    The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as 
-    an optional "metadata" field that exposes the offset/partition/topic for this message.
-
-#### Kafka Consumers and Fault Tolerance
-
-With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
-its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
-the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where
-stored in the checkpoint.
-
-The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
-
-To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-</div>
-
-Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
-So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
-
-If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
-
-#### Kafka Consumers and Timestamp Extraction/Watermark Emission
-
-In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. 
-In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
-special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka 
-Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`.
-
-You can specify your custom timestamp extractor/watermark emitter as described 
-[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or use one from the 
-[predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, you 
-can pass it to your consumer in the following way:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
-// only required for Kafka 0.8
-properties.setProperty("zookeeper.connect", "localhost:2181");
-properties.setProperty("group.id", "test");
-
-FlinkKafkaConsumer08<String> myConsumer = 
-    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
-
-DataStream<String> stream = env
-	.addSource(myConsumer)
-	.print();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
-// only required for Kafka 0.8
-properties.setProperty("zookeeper.connect", "localhost:2181");
-properties.setProperty("group.id", "test");
-
-val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
-stream = env
-    .addSource(myConsumer)
-    .print
-{% endhighlight %}
-</div>
-</div>
- 
-Internally, an instance of the assigner is executed per Kafka partition.
-When such an assigner is specified, for each record read from Kafka, the 
-`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and 
-the `Watermark getCurrentWatermark()` (for periodic) or the 
-`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine 
-if a new watermark should be emitted and with which timestamp.
-
-### Kafka Producer
-
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
-records to partitions.
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
-{% endhighlight %}
-</div>
-</div>
-
-You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
-the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced serialization schema which allows
-serializing the key and value separately. It also allows to override the target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called `KeyedSerializationSchema`.
-
-
-**Note**: By default, the number of retries is set to "0". This means that the producer fails immediately on errors,
-including leader changes. The value is set to "0" by default to avoid duplicate messages in the target topic.
-For most production environments with frequent broker changes, we recommend setting the number of retries to a 
-higher value.
-
-There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
-into a Kafka topic.
-


Mime
View raw message