flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [41/51] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
Date Wed, 24 Aug 2016 09:27:01 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md
deleted file mode 100644
index 54a75db..0000000
--- a/docs/apis/streaming/connectors/kinesis.md
+++ /dev/null
@@ -1,322 +0,0 @@
----
-title: "Amazon AWS Kinesis Streams Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 5
-sub-nav-title: Amazon Kinesis Streams
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/). 
-
-To use the connector, add the following Maven dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-**The `flink-connector-kinesis{{ site.scala_version_suffix }}` has a dependency on code licensed under the [Amazon Software License](https://aws.amazon.com/asl/) (ASL).
-Linking to the flink-connector-kinesis will include ASL licensed code into your application.**
-
-The `flink-connector-kinesis{{ site.scala_version_suffix }}` artifact is not deployed to Maven central as part of
-Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source.
-
-Download the Flink source or check it out from the git repository. Then, use the following Maven command to build the module:
-{% highlight bash %}
-mvn clean install -Pinclude-kinesis -DskipTests
-# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again. 
-cd flink-dist
-mvn clean install -Pinclude-kinesis -DskipTests
-{% endhighlight %}
-
-
-The streaming connectors are not part of the binary distribution. See how to link with them for cluster 
-execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-### Using the Amazon Kinesis Streams Service
-Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
-
-### Kinesis Consumer
-
-The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
-responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
-change as shards are closed and created by Kinesis.
-
-Before consuming data from Kinesis streams, make sure that all streams are created with the status "ACTIVE" in the AWS dashboard.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
-
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
-
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
-
-val env = StreamExecutionEnvironment.getEnvironment
-
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-{% endhighlight %}
-</div>
-</div>
-
-The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties`
-instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example
-demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
-`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, and `AUTO`). Also, data is being consumed
-from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
-to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
-
-Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
-
-#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
-
-With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
-periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the
-state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that
-was stored in the checkpoint.
-
-The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
-
-To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-</div>
-
-Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
-Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
-
-#### Event Time for Consumed Records
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
-timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
-were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
-timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
-ascending).
-
-Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
-or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
-it can be passed to the consumer in the following way:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
-kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
-kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
-{% endhighlight %}
-</div>
-</div>
-
-#### Threading Model
-
-The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
-
-For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard
-information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if
-the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless
-of the total amount of shards in the subscribed streams.
-
-For data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the
-shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be
-one thread per open shard.
-
-#### Internally Used Kinesis APIs
-
-The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
-for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
-on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running.
-Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information
-on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits.
-
-- *[DescribeStream](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)*: this is constantly called
-by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default,
-the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result
-from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of
-calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied
-configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts
-the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval.
-
-- *[GetShardIterator](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*: this is called
-only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the
-API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream),
-the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other
-non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by
-setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties.
-
-- *[GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: this is constantly called
-by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there
-are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call
-of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded,
-up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput
-of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and
-`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former
-adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while
-the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the
-consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
-
-### Kinesis Producer
-
-The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
-Flink's checkpointing and doesn't provide exactly-once processing guarantees. 
-Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). 
-
-In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.
-
-To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.
-
-For the monitoring to work, the user accessing the stream needs access to the Cloud watch service.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
-
-DataStream<String> simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-
-val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
-
-val simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
-{% endhighlight %}
-</div>
-</div>
-
-The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties`
-instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
-
-Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is 
-done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
-Otherwise, the returned stream name is used.
-
-Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
-		
-		
-### Using Non-AWS Kinesis Endpoints for Testing
-
-It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
-[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink
-application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
-
-To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the
-Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is
-required, it will not be used to determine the AWS endpoint URL.
-
-The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
-{% endhighlight %}
-</div>
-</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/nifi.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/nifi.md b/docs/apis/streaming/connectors/nifi.md
deleted file mode 100644
index a47b8f0..0000000
--- a/docs/apis/streaming/connectors/nifi.md
+++ /dev/null
@@ -1,141 +0,0 @@
----
-title: "Apache NiFi Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 7
-sub-nav-title: Apache NiFi
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Source and Sink that can read from and write to 
-[Apache NiFi](https://nifi.apache.org/). To use this connector, add the
-following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-nifi{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
-for information about how to package the program with the libraries for
-cluster execution.
-
-#### Installing Apache NiFi
-
-Instructions for setting up a Apache NiFi cluster can be found
-[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi).
-
-#### Apache NiFi Source
-
-The connector provides a Source for reading data from Apache NiFi to Apache Flink.
-
-The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi.
-
-- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given the client's SiteToSiteConfig and a
-     default wait time of 1000 ms.
-      
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(…)` given the client's
-     SiteToSiteConfig and the specified wait time (in milliseconds).
-     
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi")
-        .portName("Data for Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi")
-       .portName("Data for Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-       
-val nifiSource = new NiFiSource(clientConfig)       
-{% endhighlight %}       
-</div>
-</div>
-
-Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi 
-Site-to-site protocol configuration.
- 
-#### Apache NiFi Sink
-
-The connector provides a Sink for writing data from Apache Flink to Apache NiFi.
-
-The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`.
-
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi.
-      
-Example:
-      
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
-SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-        .url("http://localhost:8080/nifi")
-        .portName("Data from Flink")
-        .requestBatchCount(5)
-        .buildConfig();
-
-SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
-
-streamExecEnv.addSink(nifiSink);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
-
-val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
-       .url("http://localhost:8080/nifi")
-       .portName("Data from Flink")
-       .requestBatchCount(5)
-       .buildConfig()
-       
-val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
-
-streamExecEnv.addSink(nifiSink)
-{% endhighlight %}       
-</div>
-</div>      
-
-More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/rabbitmq.md b/docs/apis/streaming/connectors/rabbitmq.md
deleted file mode 100644
index 0e186e0..0000000
--- a/docs/apis/streaming/connectors/rabbitmq.md
+++ /dev/null
@@ -1,132 +0,0 @@
----
-title: "RabbitMQ Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 4
-sub-nav-title: RabbitMQ
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-rabbitmq{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Installing RabbitMQ
-Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
-
-#### RabbitMQ Source
-
-A class which provides an interface for receiving data from RabbitMQ.
-
-The followings have to be provided for the `RMQSource(…)` constructor in order:
-
-- RMQConnectionConfig.
-- queueName: The RabbitMQ queue name.
-- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`).
-- deserializationSchema: Deserialization schema to turn messages into Java objects.
-
-This source can be operated in three different modes:
-
-1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
-    unique correlation IDs.
-2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
-    (correlation id is not set).
-3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
-
-Correlation ids are a RabbitMQ application feature. You have to set it in the message properties
-when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply
-unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore
-messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't
-have to supply correlation ids.
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-DataStream<String> streamWithoutCorrelationIds = env
-	.addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
-	.print
-
-DataStream<String> streamWithCorrelationIds = env
-	.addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
-	.print
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-streamWithoutCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
-    .print
-
-streamWithCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
-    .print
-{% endhighlight %}
-</div>
-</div>
-
-#### RabbitMQ Sink
-A class providing an interface for sending data to RabbitMQ.
-
-The followings have to be provided for the `RMQSink(…)` constructor in order:
-
-1. RMQConnectionConfig
-2. The queue name
-3. Serialization schema
-
-Example:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-stream.addSink(new RMQSink<String>(connectionConfig, "hello", new SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema))
-{% endhighlight %}
-</div>
-</div>
-
-More about RabbitMQ can be found [here](http://www.rabbitmq.com/).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/redis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/redis.md b/docs/apis/streaming/connectors/redis.md
deleted file mode 100644
index dfa5296..0000000
--- a/docs/apis/streaming/connectors/redis.md
+++ /dev/null
@@ -1,177 +0,0 @@
----
-title: "Redis Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 6
-sub-nav-title: Redis
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to
-[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the
-following dependency to your project:
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-Version Compatibility: This module is compatible with Redis 2.8.5.
-
-Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Installing Redis
-Follow the instructions from the [Redis download page](http://redis.io/download).
-
-#### Redis Sink
-A class providing an interface for sending data to Redis. 
-The sink can use three different methods for communicating with different type of Redis environments:
-1. Single Redis Server
-2. Redis Cluster
-3. Redis Sentinel
-
-This code shows how to create a sink that communicate to a single redis server:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
-
-    @Override
-    public RedisCommandDescription getCommandDescription() {
-        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
-    }
-
-    @Override
-    public String getKeyFromData(Tuple2<String, String> data) {
-        return data.f0;
-    }
-
-    @Override
-    public String getValueFromData(Tuple2<String, String> data) {
-        return data.f1;
-    }
-}
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class RedisExampleMapper extends RedisMapper[(String, String)]{
-  override def getCommandDescription: RedisCommandDescription = {
-    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
-  }
-
-  override def getKeyFromData(data: (String, String)): String = data._1
-
-  override def getValueFromData(data: (String, String)): String = data._2
-}
-val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example code does the same, but for Redis Cluster:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
-    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example shows when the Redis environment is with Sentinels:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
-    .setMasterName("master").setSentinels(...).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This section gives a description of all the available data types and what Redis command used for that.
-
-<table class="table table-bordered" style="width: 75%">
-    <thead>
-        <tr>
-          <th class="text-center" style="width: 20%">Data Type</th>
-          <th class="text-center" style="width: 25%">Redis Command [Sink]</th>
-          <th class="text-center" style="width: 25%">Redis Command [Source]</th>
-        </tr>
-      </thead>
-      <tbody>
-        <tr>
-            <td>HASH</td><td><a href="http://redis.io/commands/hset">HSET</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>LIST</td><td>
-                <a href="http://redis.io/commands/rpush">RPUSH</a>, 
-                <a href="http://redis.io/commands/lpush">LPUSH</a>
-            </td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>STRING</td><td><a href="http://redis.io/commands/set">SET</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>HYPER_LOG_LOG</td><td><a href="http://redis.io/commands/pfadd">PFADD</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td><td>--NA--</td>
-        </tr>                
-      </tbody>
-</table>
-More about Redis can be found [here](http://redis.io/).

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/connectors/twitter.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/twitter.md b/docs/apis/streaming/connectors/twitter.md
deleted file mode 100644
index 9e84481..0000000
--- a/docs/apis/streaming/connectors/twitter.md
+++ /dev/null
@@ -1,89 +0,0 @@
----
-title: "Twitter Connector"
-
-# Sub-level navigation
-sub-nav-group: streaming
-sub-nav-parent: connectors
-sub-nav-pos: 5
-sub-nav-title: Twitter
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-The Twitter Streaming API provides access to the stream of tweets made available by Twitter. 
-Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream. 
-To use this connector, add the following dependency to your project:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-twitter{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary distribution. 
-See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
-
-#### Authentication
-In order to connect to the Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
-
-#### Acquiring the authentication information
-First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) 
-or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by 
-clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
-After selecting the application, the API key and API secret (called `twitter-source.consumerKey` and `twitter-source.consumerSecret` in `TwitterSource` respectively) are located on the "API Keys" tab. 
-The necessary OAuth Access Token data (`twitter-source.token` and `twitter-source.tokenSecret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab.
-Remember to keep these pieces of information secret and do not push them to public repositories.
- 
- 
-
-#### Usage
-In contrast to other connectors, the `TwitterSource` depends on no additional services. For example the following code should run gracefully:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Properties props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
-DataStream<String> streamSource = env.addSource(new TwitterSource(props));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val props = new Properties();
-p.setProperty(TwitterSource.CONSUMER_KEY, "");
-p.setProperty(TwitterSource.CONSUMER_SECRET, "");
-p.setProperty(TwitterSource.TOKEN, "");
-p.setProperty(TwitterSource.TOKEN_SECRET, "");
-DataStream<String> streamSource = env.addSource(new TwitterSource(props));
-{% endhighlight %}
-</div>
-</div>
-
-The `TwitterSource` emits strings containing a JSON object, representing a Tweet.
-
-The `TwitterExample` class in the `flink-examples-streaming` package shows a full example how to use the `TwitterSource`.
-
-By default, the `TwitterSource` uses the `StatusesSampleEndpoint`. This endpoint returns a random sample of Tweets.
-There is a `TwitterSource.EndpointInitializer` interface allowing users to provide a custom endpoint.
-

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/event_time.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_time.md b/docs/apis/streaming/event_time.md
deleted file mode 100644
index 7f94d68..0000000
--- a/docs/apis/streaming/event_time.md
+++ /dev/null
@@ -1,208 +0,0 @@
----
-title: "Event Time"
-
-sub-nav-id: eventtime
-sub-nav-group: streaming
-sub-nav-pos: 3
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* toc
-{:toc}
-
-# Event Time / Processing Time / Ingestion Time
-
-Flink supports different notions of *time* in streaming programs.
-
-- **Processing time:** Processing time refers to the system time of the machine that is executing the
-    respective operation.
-
-    When a streaming program runs on processing time, all time-based operations (like time windows) will
-    use the system clock of the machines that run the respective operator. For example, an hourly
-    processing time window will include all records that arrived at a specific operator between the
-    times when the system clock indicated the full hour.
-
-    Processing time is the simplest notion of time and requires no coordination between streams and machines.
-    It provides the best performance and the lowest latency. However, in distributed and asynchronous
-    environments processing time does not provide determinism, because it is susceptible to the speed at which
-    records arrive in the system (for example from the message queue), and to the speed at which the
-    records flow between operators inside the system.
-
-- **Event time:** Event time is the time that each individual event occurred on its producing device.
-    This time is typically embedded within the records before they enter Flink and that *event timestamp*
-    can be extracted from the record. An hourly event time window will contain all records that carry an
-    event timestamp that falls into that hour, regardless of when the records arrive, and in what order
-    they arrive.
-
-    Event time gives correct results even on out-of-order events, late events, or on replays
-    of data from backups or persistent logs. In event time, the progress of time depends on the data,
-    not on any wall clocks. Event time programs must specify how to generate *Event Time Watermarks*,
-    which is the mechanism that signals time progress in event time. The mechanism is
-    described below.
-
-    Event time processing often incurs a certain latency, due to it nature of waiting a certain time for
-    late events and out-of-order events. Because of that, event time programs are often combined with
-    *processing time* operations.
-
-- **Ingestion time:** Ingestion time is the time that events enter Flink. At the source operator, each
-    record gets the source's current time as a timestamp, and time-based operations (like time windows)
-    refer to that timestamp.
-
-    *Ingestion Time* sits conceptually in between *Event Time* and *Processing Time*. Compared to
-    *Processing Time*, it is slightly more expensive, but gives more predictable results: Because
-    *Ingestion Time* uses stable timestamps (assigned once at the source), different window operations
-    over the records will refer to the same timestamp, whereas in *Processing Time* each window operator
-    may assign the record to a different window (based on the local system clock and any transport delay).
-
-    Compered to *Event Time*, *Ingestion Time* programs cannot handle any out-of-order events or late data,
-    but the programs don't have to specify how to generate *Watermarks*.
-
-    Internally, *Ingestion Time* is treated much like event time, with automatic timestamp assignment and
-    automatic Watermark generation.
-
-<img src="fig/times_clocks.svg" class="center" width="80%" />
-
-
-### Setting a Time Characteristic
-
-The first part of a Flink DataStream program is usually to set the base *time characteristic*. That setting
-defines how data stream sources behave (for example whether to assign timestamps), and what notion of
-time the window operations like `KeyedStream.timeWindow(Time.seconds(30))` refer to.
-
-The following example shows a Flink program that aggregates events in hourly time windows. The behavior of the
-windows adapts with the time characteristic.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
-
-stream
-    .keyBy( (event) -> event.getUser() )
-    .timeWindow(Time.hours(1))
-    .reduce( (a, b) -> a.add(b) )
-    .addSink(...);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
-
-stream
-    .keyBy( _.getUser )
-    .timeWindow(Time.hours(1))
-    .reduce( (a, b) => a.add(b) )
-    .addSink(...)
-{% endhighlight %}
-</div>
-</div>
-
-
-Note that in order to run this example in *Event Time*, the program needs to use either an event time
-source, or inject a *Timestamp Assigner & Watermark Generator*. Those functions describe how to access
-the event timestamps, and what timely out-of-orderness the event stream exhibits.
-
-The section below describes the general mechanism behind *Timestamps* and *Watermarks*. For a guide on how
-to use timestamp assignment and watermark generation in the Flink DataStream API, please refer to
-[Generating Timestamps / Watermarks]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html)
-
-
-# Event Time and Watermarks
-
-*Note: Flink implements many techniques from the Dataflow Model. For a good introduction to Event Time and, have also a look at these articles*
-
-  - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau
-  - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf)
-
-
-A stream processor that supports *event time* needs a way to measure the progress of event time. 
-For example, a window operator that builds hourly windows needs to be notified when event time has reached the
-next full hour, such that the operator can close the next window.
-
-*Event Time* can progress independently of *Processing Time* (measures by wall clocks).
-For example, in one program, the current *event time* of an operator can trail slightly behind the processing time
-(accounting for a delay in receiving the latest elements) and both proceed at the same speed. In another streaming
-program, which reads fast-forward through some data already buffered in a Kafka topic (or another message queue), event time
-can progress by weeks in seconds.
-
-------
-
-The mechanism in Flink to measure progress in event time is **Watermarks**.
-Watermarks flow as part of the data stream and carry a timestamp *t*. A *Watermark(t)* declares that event time has reached time
-*t* in that stream, meaning that all events with a timestamps *t' < t* have occurred.
-
-The figure below shows a stream of events with (logical) timestamps, and watermarks flowing inline. The events are in order
-(with respect to their timestamp), meaning that watermarks are simply periodic markers in the stream with an in-order timestamp.
-
-<img src="fig/stream_watermark_in_order.svg" alt="A data stream with events (in order) and watermarks" class="center" width="65%" />
-
-Watermarks are crucial for *out-of-order* streams, as shown in the figure below, where, events do not occur ordered by their timestamp.
-Watermarks establish points in the stream where all events up to a certain timestamp have occurred. Once these watermarks reach an
-operator, the operator can advance its internal *event time clock* to the value of the watermark.
-
-<img src="fig/stream_watermark_out_of_order.svg" alt="A data stream with events (out of order) and watermarks" class="center" width="65%" />
-
-
-## Watermarks in Parallel Streams
-
-Watermarks are generated at source functions, or directly after source functions. Each parallel subtask of a source function usually
-generates its watermarks independently. These watermarks define the event time at that particular parallel source.
-
-As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an
-operator advances its event time, it generates a new watermark downstream for its successor operators.
-
-Operators that consume multiple input streams (e.g., after a *keyBy(...)* or *partition(...)* function, or a union) track the event time
-on each of their input streams. The operator's current event time is the minimum of the input streams' event time. As the input streams
-update their event time, so does the operator.
-
-The figure below shows an example of events and watermarks flowing through parallel streams, and operators tracking event time.
-
-<img src="fig/parallel_streams_watermarks.svg" alt="Parallel data streams and operators with events and watermarks" class="center" width="80%" />
-
-
-## Late Elements
-
-It is possible that certain elements violate the watermark condition, meaning that even after the *Watermark(t)* has occurred,
-more elements with timestamp *t' < t* will occur. In fact, in many real world setups, certain elements can be arbitrarily
-delayed, making it impossible to define a time when all elements of a certain event timestamp have occurred.
-Further more, even if the lateness can be bounded, delaying the watermarks by too much is often not desirable, because it delays
-the evaluation of the event time windows by too much.
-
-Due to that, some streaming programs will explicitly expect a number of *late* elements. Late elements are elements that
-arrive after the system's event time clock (as signaled by the watermarks) has already passed the time of the late element's
-timestamp.
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/event_timestamp_extractors.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamp_extractors.md b/docs/apis/streaming/event_timestamp_extractors.md
deleted file mode 100644
index 83a90d2..0000000
--- a/docs/apis/streaming/event_timestamp_extractors.md
+++ /dev/null
@@ -1,108 +0,0 @@
----
-title: "Pre-defined Timestamp Extractors / Watermark Emitters"
-
-sub-nav-group: streaming
-sub-nav-pos: 2
-sub-nav-parent: eventtime
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* toc
-{:toc}
-
-As described in [timestamps and watermark handling]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically, 
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending 
-on their use-case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of 
-the incoming records, e.g. whenever a special element is encountered in the stream.
-
-In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners. 
-This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example 
-for custom assigner implementations.
-
-#### **Assigner with Ascending Timestamps**
-
-The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task 
-occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will 
-arrive.
-
-Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if
-in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
-timestamps are ascending within each Kafka partition. Flink's Watermark merging mechanism will generate correct
-watermarks whenever parallel streams are shuffled, unioned, connected, or merged.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks = 
-    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
-{% endhighlight %}
-</div>
-</div>
-
-#### **Assigner which allows a fixed amount of record lateness**
-
-Another example of periodic watermark generation is when the watermark lags behind the maximum (event-time) timestamp
-seen in the stream by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in a 
-stream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period of 
-time for testing. For these cases, Flink provides the `BoundedOutOfOrdernessTimestampExtractor` which takes as an argument 
-the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed to be late before being ignored when computing the 
-final result for the given window. Lateness corresponds to the result of `t - t_w`, where `t` is the (event-time) timestamp of an 
-element, and `t_w` that of the previous watermark. If `lateness > 0` then the element is considered late and is ignored when computing 
-the result of the job for its corresponding window.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks = 
-    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
-{% endhighlight %}
-</div>
-</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamps_watermarks.md b/docs/apis/streaming/event_timestamps_watermarks.md
deleted file mode 100644
index 05c9f51..0000000
--- a/docs/apis/streaming/event_timestamps_watermarks.md
+++ /dev/null
@@ -1,332 +0,0 @@
----
-title: "Generating Timestamps / Watermarks"
-
-sub-nav-group: streaming
-sub-nav-pos: 1
-sub-nav-parent: eventtime
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* toc
-{:toc}
-
-
-This section is relevant for program running on **Event Time**. For an introduction to *Event Time*,
-*Processing Time*, and *Ingestion Time*, please refer to the [event time introduction]({{ site.baseurl }}/apis/streaming/event_time.html)
-
-To work with *Event Time*, streaming programs need to set the *time characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *Event Time*, Flink needs to know the events' *timestamps*, meaning each element in the
-stream needs to get its event timestamp *assigned*. That happens usually by accessing/extracting the
-timestamp from some field in the element.
-
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
-the progress in event time.
-
-There are two ways to assign timestamps and generate Watermarks:
-
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted
-
-<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
-millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
-
-### Source Functions with Timestamps and Watermarks
-
-Stream sources can also directly assign timestamps to the elements they produce and emit Watermarks. In that case,
-no Timestamp Assigner is needed.
-
-To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)`
-method on the `SourceContext`. To generate Watermarks, the source must call the `emitWatermark(Watermark)` function.
-
-Below is a simple example of a source *(non-checkpointed)* that assigns timestamps and generates Watermarks
-depending on special events:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-@Override
-public void run(SourceContext<MyType> ctx) throws Exception {
-	while (/* condition */) {
-		MyType next = getNext();
-		ctx.collectWithTimestamp(next, next.getEventTimestamp());
-
-		if (next.hasWatermarkTime()) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
-		}
-	}
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-override def run(ctx: SourceContext[MyType]): Unit = {
-	while (/* condition */) {
-		val next: MyType = getNext()
-		ctx.collectWithTimestamp(next, next.eventTimestamp)
-
-		if (next.hasWatermarkTime) {
-			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
-		}
-	}
-}
-{% endhighlight %}
-</div>
-</div>
-
-*Note:* If the streaming program uses a TimestampAssigner on a stream where elements have a timestamp already,
-those timestamps will be overwritten by the TimestampAssigner. Similarly, Watermarks will be overwritten as well.
-
-
-### Timestamp Assigners / Watermark Generators
-
-Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
-original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
-
-The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
-A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
-In any case, the timestamp assigner needs to be specified before the first operation on event time
-(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
-Flink allows the specification of a timestamp assigner / watermark emitter inside
-the source (or consumer) itself. More information on how to do so can be found in the
-[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
-
-
-**NOTE:** The remainder of this section presents the main interfaces a programmer has
-to implement in order to create her own timestamp extractors/watermark emitters.
-To see the pre-implemented extractors that ship with Flink, please refer to the
-[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html) page.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-DataStream<MyEvent> stream = env.readFile(
-        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
-        FilePathFilter.createDefaultFilter(), typeInfo);
-
-DataStream<MyEvent> withTimestampsAndWatermarks = stream
-        .filter( event -> event.severity() == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
-
-withTimestampsAndWatermarks
-        .keyBy( (event) -> event.getGroup() )
-        .timeWindow(Time.seconds(10))
-        .reduce( (a, b) -> a.add(b) )
-        .addSink(...);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val stream: DataStream[MyEvent] = env.readFile(
-         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
-         FilePathFilter.createDefaultFilter());
-
-val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
-        .filter( _.severity == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
-
-withTimestampsAndWatermarks
-        .keyBy( _.getGroup )
-        .timeWindow(Time.seconds(10))
-        .reduce( (a, b) => a.add(b) )
-        .addSink(...)
-{% endhighlight %}
-</div>
-</div>
-
-
-#### **With Periodic Watermarks**
-
-The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
-on the stream elements, or purely based on processing time).
-
-The interval (every *n* milliseconds) in which the watermark will be generated is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. Each time, the assigner's `getCurrentWatermark()` method will be
-called, and a new Watermark will be emitted, if the returned Watermark is non-null and larger than the previous
-Watermark.
-
-Two simple examples of timestamp assigners with periodic watermark generation are below.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-/**
- * This generator generates watermarks assuming that elements come out of order to a certain degree only.
- * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
- * elements for timestamp t.
- */
-public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
-
-    private final long maxOutOfOrderness = 3500; // 3.5 seconds
-
-    private long currentMaxTimestamp;
-
-    @Override
-    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-        long timestamp = element.getCreationTime();
-        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
-        return timestamp;
-    }
-
-    @Override
-    public Watermark getCurrentWatermark() {
-        // return the watermark as current highest timestamp minus the out-of-orderness bound
-        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
-    }
-}
-
-/**
- * This generator generates watermarks that are lagging behind processing time by a certain amount.
- * It assumes that elements arrive in Flink after at most a certain time.
- */
-public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
-
-	private final long maxTimeLag = 5000; // 5 seconds
-
-	@Override
-	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-		return element.getCreationTime();
-	}
-
-	@Override
-	public Watermark getCurrentWatermark() {
-		// return the watermark as current time minus the maximum time lag
-		return new Watermark(System.currentTimeMillis() - maxTimeLag);
-	}
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-/**
- * This generator generates watermarks assuming that elements come out of order to a certain degree only.
- * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
- * elements for timestamp t.
- */
-class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
-
-    val maxOutOfOrderness = 3500L; // 3.5 seconds
-
-    var currentMaxTimestamp: Long;
-
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        val timestamp = element.getCreationTime()
-        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
-        timestamp;
-    }
-
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current highest timestamp minus the out-of-orderness bound
-        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
-    }
-}
-
-/**
- * This generator generates watermarks that are lagging behind processing time by a certain amount.
- * It assumes that elements arrive in Flink after at most a certain time.
- */
-class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
-
-    val maxTimeLag = 5000L; // 5 seconds
-
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-        element.getCreationTime
-    }
-
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
-    }
-}
-{% endhighlight %}
-</div>
-</div>
-
-#### **With Punctuated Watermarks**
-
-To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use the
-`AssignerWithPunctuatedWatermarks`. For this class, Flink will first call the `extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call for that element the
-`checkAndGetNextWatermark(...)` method.
-
-The `checkAndGetNextWatermark(...)` method gets the timestamp that was assigned in the `extractTimestamp(...)`
-method, and can decide whether it wants to generate a Watermark. Whenever the `checkAndGetNextWatermark(...)`
-method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that
-new Watermark will be emitted.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
-
-	@Override
-	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
-		return element.getCreationTime();
-	}
-
-	@Override
-	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
-		return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
-	}
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
-
-	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
-		element.getCreationTime
-	}
-
-	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
-		if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
-	}
-}
-{% endhighlight %}
-</div>
-</div>
-
-*Note:* It is possible to generate a watermark on every single event. However, because each watermark causes some
-computation downstream, an excessive number of watermarks slows down performance.
-

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
deleted file mode 100644
index 99221e5..0000000
--- a/docs/apis/streaming/fault_tolerance.md
+++ /dev/null
@@ -1,462 +0,0 @@
----
-title: "Fault Tolerance"
-is_beta: false
-
-sub-nav-group: streaming
-sub-nav-id: fault_tolerance
-sub-nav-pos: 5
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink's fault tolerance mechanism recovers programs in the presence of failures and
-continues to execute them. Such failures include machine hardware failures, network failures,
-transient program failures, etc.
-
-* This will be replaced by the TOC
-{:toc}
-
-
-Streaming Fault Tolerance
--------------------------
-
-Flink has a checkpointing mechanism that recovers streaming jobs after failures. The checkpointing mechanism requires a *persistent* (or *durable*) source that
-can be asked for prior records again (Apache Kafka is a good example of such a source).
-
-The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see [Working with State](state.html)) consistently to provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured [state backend](state_backends.html).
-
-The [docs on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
-
-To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
-  When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
-
-- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
-  Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
-
-- *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
-
-- *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000);
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig().setCheckpointTimeout(60000);
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000)
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig.setCheckpointTimeout(60000)
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### Fault Tolerance Guarantees of Data Sources and Sinks
-
-Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the
-snapshotting mechanism. The following table lists the state update guarantees of Flink coupled with the bundled connectors.
-
-Please read the documentation of each connector to understand the details of the fault tolerance guarantees.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Source</th>
-      <th class="text-left" style="width: 25%">Guarantees</th>
-      <th class="text-left">Notes</th>
-    </tr>
-   </thead>
-   <tbody>
-        <tr>
-            <td>Apache Kafka</td>
-            <td>exactly once</td>
-            <td>Use the appropriate Kafka connector for your version</td>
-        </tr>
-        <tr>
-            <td>AWS Kinesis Streams</td>
-            <td>exactly once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>RabbitMQ</td>
-            <td>at most once (v 0.10) / exactly once (v 1.0) </td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Twitter Streaming API</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Collections</td>
-            <td>exactly once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Files</td>
-            <td>exactly once</td>
-            <td></td>
-        </tr>
-        <tr>
-            <td>Sockets</td>
-            <td>at most once</td>
-            <td></td>
-        </tr>
-  </tbody>
-</table>
-
-To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs
-to take part in the checkpointing mechanism. The following table lists the delivery guarantees (assuming exactly-once
-state updates) of Flink coupled with bundled sinks:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Sink</th>
-      <th class="text-left" style="width: 25%">Guarantees</th>
-      <th class="text-left">Notes</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>HDFS rolling sink</td>
-        <td>exactly once</td>
-        <td>Implementation depends on Hadoop version</td>
-    </tr>
-    <tr>
-        <td>Elasticsearch</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Kafka producer</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Cassandra sink</td>
-        <td>at least once / exactly once</td>
-        <td>exactly once only for idempotent updates</td>
-    </tr>
-    <tr>
-        <td>AWS Kinesis Streams</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>File sinks</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Socket sinks</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Standard output</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-    <tr>
-        <td>Redis sink</td>
-        <td>at least once</td>
-        <td></td>
-    </tr>
-  </tbody>
-</table>
-
-{% top %}
-
-## Restart Strategies
-
-Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
-The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
-In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
- 
-The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
-The configuration parameter *restart-strategy* defines which strategy is taken.
-Per default, the no-restart strategy is used.
-See the following list of available restart strategies to learn what values are supported.
-
-Each restart strategy comes with its own set of parameters which control its behaviour.
-These values are also set in the configuration file.
-The description of each restart strategy contains more information about the respective configuration values.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 50%">Restart Strategy</th>
-      <th class="text-left">Value for restart-strategy</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Fixed delay</td>
-        <td>fixed-delay</td>
-    </tr>
-    <tr>
-        <td>Failure rate</td>
-        <td>failure-rate</td>
-    </tr>
-    <tr>
-        <td>No restart</td>
-        <td>none</td>
-    </tr>
-  </tbody>
-</table>
-
-Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy.
-This restart strategy is set programmatically by calling the `setRestartStrategy` method on the `ExecutionEnvironment`.
-Note that this also works for the `StreamExecutionEnvironment`.
-
-The following example shows how we can set a fixed delay restart strategy for our job.
-In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### Fixed Delay Restart Strategy
-
-The fixed delay restart strategy attempts a given number of times to restart the job.
-If the maximum number of attempts is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
-
-~~~
-restart-strategy: fixed-delay
-~~~
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><it>restart-strategy.fixed-delay.attempts</it></td>
-        <td>Number of restart attempts</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.fixed-delay.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-~~~
-restart-strategy.fixed-delay.attempts: 3
-restart-strategy.fixed-delay.delay: 10 s
-~~~
-
-The fixed delay restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts 
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-#### Restart Attempts
-
-The number of times that Flink retries the execution before the job is declared as failed is configurable via the *restart-strategy.fixed-delay.attempts* parameter.
-
-The default value is **1**.
-
-#### Retry Delays
-
-Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay.
-
-Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
-
-The default value is the value of *akka.ask.timeout*.
-
-{% top %}
-
-### Failure Rate Restart Strategy
-
-The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
-
-~~~
-restart-strategy: failure-rate
-~~~
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
-        <td>Maximum number of restarts in given time interval before failing a job</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
-        <td>Time interval for measuring failure rate.</td>
-        <td>1 minute</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-~~~
-restart-strategy.failure-rate.max-failures-per-interval: 3
-restart-strategy.failure-rate.failure-rate-interval: 5 min
-restart-strategy.failure-rate.delay: 10 s
-~~~
-
-The failure rate restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per interval
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per unit
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### No Restart Strategy
-
-The job fails directly and no restart is attempted.
-
-~~~
-restart-strategy: none
-~~~
-
-The no restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.noRestart());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.noRestart())
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}


Mime
View raw message