flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [17/89] [abbrv] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
Date Thu, 25 Aug 2016 18:48:20 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
new file mode 100644
index 0000000..2dd7842
--- /dev/null
+++ b/docs/dev/datastream_api.md
@@ -0,0 +1,1779 @@
+---
+title: "Flink DataStream API Programming Guide"
+nav-title: Streaming (DataStream API)
+nav-parent_id: apis
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+DataStream programs in Flink are regular programs that implement transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for
+example write the data to files, or to standard output (for example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+Please see [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) for an introduction
+to the basic concepts of the Flink API.
+
+In order to create your own Flink DataStream program, we encourage you to start with
+[anatomy of a Flink Program]({{ site.baseurl }}/dev/api_concepts.html#anatomy-of-a-flink-program)
+and gradually add your own
+[transformations](#datastream-transformations). The remaining sections act as references for additional
+operations and advanced features.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Example Program
+---------------
+
+The following program is a complete, working example of streaming window word count application, that counts the
+words coming from a web socket in 5 second windows. You can copy &amp; paste the code to run it locally.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+public class WindowWordCount {
+
+    public static void main(String[] args) throws Exception {
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Tuple2<String, Integer>> dataStream = env
+                .socketTextStream("localhost", 9999)
+                .flatMap(new Splitter())
+                .keyBy(0)
+                .timeWindow(Time.seconds(5))
+                .sum(1);
+
+        dataStream.print();
+
+        env.execute("Window WordCount");
+    }
+
+    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+        @Override
+        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
+            for (String word: sentence.split(" ")) {
+                out.collect(new Tuple2<String, Integer>(word, 1));
+            }
+        }
+    }
+
+}
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.time.Time
+
+object WindowWordCount {
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val text = env.socketTextStream("localhost", 9999)
+
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .keyBy(0)
+      .timeWindow(Time.seconds(5))
+      .sum(1)
+
+    counts.print
+
+    env.execute("Window Stream WordCount")
+  }
+}
+{% endhighlight %}
+</div>
+
+</div>
+
+To run the example program, start the input stream with netcat first from a terminal:
+
+~~~bash
+nc -lk 9999
+~~~
+
+Just type some words hitting return for a new word. These will be the input to the
+word count program. If you want to see counts greater than 1, type the same word again and again within
+5 seconds (increase the window size from 5 seconds if you cannot type that fast &#9786;).
+
+{% top %}
+
+DataStream Transformations
+--------------------------
+
+Data transformations transform one or more DataStreams into a new DataStream. Programs can combine
+multiple transformations into sophisticated topologies.
+
+This section gives a description of all the available transformations.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that doubles the values of the input stream:</p>
+    {% highlight java %}
+DataStream<Integer> dataStream = //...
+dataStream.map(new MapFunction<Integer, Integer>() {
+    @Override
+    public Integer map(Integer value) throws Exception {
+        return 2 * value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
+    {% highlight java %}
+dataStream.flatMap(new FlatMapFunction<String, String>() {
+    @Override
+    public void flatMap(String value, Collector<String> out)
+        throws Exception {
+        for(String word: value.split(" ")){
+            out.collect(word);
+        }
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight java %}
+dataStream.filter(new FilterFunction<Integer>() {
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value != 0;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a href="#specifying-keys">keys</a> on how to specify keys.
+            This transformation returns a KeyedDataStream.</p>
+    {% highlight java %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
+            emits the new value.
+                    <br/>
+            	<br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight java %}
+keyedStream.reduce(new ReduceFunction<Integer>() {
+    @Override
+    public Integer reduce(Integer value1, Integer value2)
+    throws Exception {
+        return value1 + value2;
+    }
+});
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight java %}
+DataStream<String> result =
+  keyedStream.fold("start", new FoldFunction<Integer, String>() {
+    @Override
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+  });
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference between min
+	    and minBy is that min returns the minimum value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight java %}
+keyedStream.sum(0);
+keyedStream.sum("key");
+keyedStream.min(0);
+keyedStream.min("key");
+keyedStream.max(0);
+keyedStream.max("key");
+keyedStream.minBy(0);
+keyedStream.minBy("key");
+keyedStream.maxBy(0);
+keyedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+            See <a href="windows.html">windows</a> for a complete description of windows.
+    {% highlight java %}
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group all the stream events
+              according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+              See <a href="windows.html">windows</a> for a complete description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight java %}
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight java %}
+windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
+    public void apply (Tuple tuple,
+            Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
+    public void apply (Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns the reduced value.</p>
+    {% highlight java %}
+windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+    {% highlight java %}
+windowedStream.fold("start-", new FoldFunction<Integer, String>() {
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+	    and minBy is that min returns the minimun value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight java %}
+windowedStream.sum(0);
+windowedStream.sum("key");
+windowedStream.min(0);
+windowedStream.min("key");
+windowedStream.max(0);
+windowedStream.max("key");
+windowedStream.minBy(0);
+windowedStream.minBy("key");
+windowedStream.maxBy(0);
+windowedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream
+            with itself you will get each element twice in the resulting stream.</p>
+    {% highlight java %}
+dataStream.union(otherStream1, otherStream2, ...);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.join(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply (new JoinFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply (new CoGroupFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types. Connect allowing for shared state between
+            the two streams.</p>
+    {% highlight java %}
+DataStream<Integer> someStream = //...
+DataStream<String> otherStream = //...
+
+ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight java %}
+connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
+    @Override
+    public Boolean map1(Integer value) {
+        return true;
+    }
+
+    @Override
+    public Boolean map2(String value) {
+        return false;
+    }
+});
+connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+
+   @Override
+   public void flatMap1(Integer value, Collector<String> out) {
+       out.collect(value.toString());
+   }
+
+   @Override
+   public void flatMap2(String value, Collector<String> out) {
+       for (String word: value.split(" ")) {
+         out.collect(word);
+       }
+   }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some criterion.
+                {% highlight java %}
+SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
+    @Override
+    public Iterable<String> select(Integer value) {
+        List<String> output = new ArrayList<String>();
+        if (value % 2 == 0) {
+            output.add("even");
+        }
+        else {
+            output.add("odd");
+        }
+        return output;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight java %}
+SplitStream<Integer> split;
+DataStream<Integer> even = split.select("even");
+DataStream<Integer> odd = split.select("odd");
+DataStream<Integer> all = split.select("even","odd");
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the output of one operator
+                to some previous operator. This is especially useful for defining algorithms that
+                continuously update a model. The following code starts with a stream and applies
+		the iteration body continuously. Elements that are greater than 0 are sent back
+		to the feedback channel, and the rest of the elements are forwarded downstream.
+		See <a href="#iterations">iterations</a> for a complete description.
+                {% highlight java %}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value > 0;
+    }
+});
+iteration.closeWith(feedback);
+DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value <= 0;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics. See <a href="{{ site.baseurl }}/dev/event_time.html">Event Time</a>.
+                {% highlight java %}
+stream.assignTimestamps (new TimeStampExtractor() {...});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that doubles the values of the input stream:</p>
+    {% highlight scala %}
+dataStream.map { x => x * 2 }
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
+    {% highlight scala %}
+dataStream.flatMap { str => str.split(" ") }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight scala %}
+dataStream.filter { _ != 0 }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a href="#specifying-keys">keys</a> on how to specify keys.
+            This transformation returns a KeyedDataStream.</p>
+    {% highlight scala %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
+            emits the new value.
+                    <br/>
+            	<br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight scala %}
+keyedStream.reduce { _ + _ }
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    keyedStream.fold("start")((str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference between min
+	    and minBy is that min returns the minimun value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight scala %}
+keyedStream.sum(0)
+keyedStream.sum("key")
+keyedStream.min(0)
+keyedStream.min("key")
+keyedStream.max(0)
+keyedStream.max("key")
+keyedStream.minBy(0)
+keyedStream.minBy("key")
+keyedStream.maxBy(0)
+keyedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+            See <a href="windows.html">windows</a> for a description of windows.
+    {% highlight scala %}
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group all the stream events
+              according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+              See <a href="windows.html">windows</a> for a complete description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight scala %}
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight scala %}
+windowedStream.apply { WindowFunction }
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply { AllWindowFunction }
+
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns the reduced value.</p>
+    {% highlight scala %}
+windowedStream.reduce { _ + _ }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    windowedStream.fold("start", (str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </td>
+	</tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+	    and minBy is that min returns the minimum value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight scala %}
+windowedStream.sum(0)
+windowedStream.sum("key")
+windowedStream.min(0)
+windowedStream.min("key")
+windowedStream.max(0)
+windowedStream.max("key")
+windowedStream.minBy(0)
+windowedStream.minBy("key")
+windowedStream.maxBy(0)
+windowedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream
+            with itself you will get each element twice in the resulting stream.</p>
+    {% highlight scala %}
+dataStream.union(otherStream1, otherStream2, ...)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.join(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply { ... }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
+    .apply {}
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types, allowing for shared state between
+            the two streams.</p>
+    {% highlight scala %}
+someStream : DataStream[Int] = ...
+otherStream : DataStream[String] = ...
+
+val connectedStreams = someStream.connect(otherStream)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight scala %}
+connectedStreams.map(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+connectedStreams.flatMap(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some criterion.
+                {% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => List("even")
+      case 1 => List("odd")
+    }
+)
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight scala %}
+
+val even = split select "even"
+val odd = split select "odd"
+val all = split.select("even","odd")
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  &rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the output of one operator
+                to some previous operator. This is especially useful for defining algorithms that
+                continuously update a model. The following code starts with a stream and applies
+		the iteration body continuously. Elements that are greater than 0 are sent back
+		to the feedback channel, and the rest of the elements are forwarded downstream.
+		See <a href="#iterations">iterations</a> for a complete description.
+                {% highlight java %}
+initialStream.iterate {
+  iteration => {
+    val iterationBody = iteration.map {/*do something*/}
+    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
+  }
+}
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics.
+                See <a href="{{ site.baseurl }}/apis/streaming/event_time.html">Event Time</a>.
+                {% highlight scala %}
+stream.assignTimestamps { timestampExtractor }
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
+{% highlight scala %}
+val data: DataStream[(Int, String, Double)] = // [...]
+data.map {
+  case (id, name, temperature) => // [...]
+}
+{% endhighlight %}
+is not supported by the API out-of-the-box. To use this feature, you should use a <a href="../scala_api_extensions.html">Scala API extension</a>.
+
+
+</div>
+</div>
+
+The following transformations are available on data streams of Tuples:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>Selects a subset of fields from the tuples
+{% highlight java %}
+DataStream<Tuple3<Integer, Double, String>> in = // [...]
+DataStream<Tuple2<String, Integer>> out = in.project(2,0);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>Selects a subset of fields from the tuples
+{% highlight scala %}
+val in : DataStream[(Int,Double,String)] = // [...]
+val out = in.project(2,0)
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+### Physical partitioning
+
+Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation,
+via the following functions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each element.
+            {% highlight java %}
+dataStream.partitionCustom(partitioner, "someKey");
+dataStream.partitionCustom(partitioner, 0);
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight java %}
+dataStream.shuffle();
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight java %}
+dataStream.rebalance();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream operations. This is
+            useful if you want to have pipelines where you, for example, fan out from
+            each parallel instance of a source to a subset of several mappers to distribute load
+            but don't want the full rebalance that rebalance() would incur. This would require only
+            local data transfers instead of transferring data over network, depending on
+            other configuration values such as the number of slots of TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream operation sends
+            elements depends on the degree of parallelism of both the upstream and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the downstream operation
+            has parallelism 4, then one upstream operation would distribute elements to two
+            downstream operations while the other upstream operation would distribute to the other
+            two downstream operations. If, on the other hand, the downstream operation has parallelism
+            2 while the upstream operation has parallelism 4 then two upstream operations would
+            distribute to one downstream operation while the other two upstream operations would
+            distribute to the other downstream operations.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of each other one or several
+            downstream operations will have a differing number of inputs from upstream operations.
+
+        </p>
+        </p>
+            Please see this figure for a visualization of the connection pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale();
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight java %}
+dataStream.broadcast();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each element.
+            {% highlight scala %}
+dataStream.partitionCustom(partitioner, "someKey")
+dataStream.partitionCustom(partitioner, 0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight scala %}
+dataStream.shuffle()
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight scala %}
+dataStream.rebalance()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements, round-robin, to a subset of downstream operations. This is
+            useful if you want to have pipelines where you, for example, fan out from
+            each parallel instance of a source to a subset of several mappers to distribute load
+            but don't want the full rebalance that rebalance() would incur. This would require only
+            local data transfers instead of transferring data over network, depending on
+            other configuration values such as the number of slots of TaskManagers.
+        </p>
+        <p>
+            The subset of downstream operations to which the upstream operation sends
+            elements depends on the degree of parallelism of both the upstream and downstream operation.
+            For example, if the upstream operation has parallelism 2 and the downstream operation
+            has parallelism 4, then one upstream operation would distribute elements to two
+            downstream operations while the other upstream operation would distribute to the other
+            two downstream operations. If, on the other hand, the downstream operation has parallelism
+            2 while the upstream operation has parallelism 4 then two upstream operations would
+            distribute to one downstream operation while the other two upstream operations would
+            distribute to the other downstream operations.
+        </p>
+        <p>
+            In cases where the different parallelisms are not multiples of each other one or several
+            downstream operations will have a differing number of inputs from upstream operations.
+
+        </p>
+        </p>
+            Please see this figure for a visualization of the connection pattern in the above
+            example:
+        </p>
+
+        <div style="text-align: center">
+            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
+            </div>
+
+
+        <p>
+                    {% highlight java %}
+dataStream.rescale()
+            {% endhighlight %}
+
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight scala %}
+dataStream.broadcast()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+### Task chaining and resource groups
+
+Chaining two subsequent transformations means co-locating them within the same thread for better
+performance. Flink by default chains operators if this is possible (e.g., two subsequent map
+transformations). The API gives fine-grained control over chaining if desired:
+
+Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to disable chaining in
+the whole job. For more fine grained control, the following functions are available. Note that
+these functions can only be used right after a DataStream transformation as they refer to the
+previous transformation. For example, you can use `someStream.map(...).startNewChain()`, but
+you cannot use `someStream.startNewChain()`.
+
+A resource group is a slot in Flink, see
+[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots). You can
+manually isolate operators in separate slots if desired.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+	mappers will be chained, and filter will not be chained to
+	the first mapper.
+{% highlight java %}
+someStream.filter(...).map(...).startNewChain().map(...);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight java %}
+someStream.map(...).disableChaining();
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td>Set slot sharing group</td>
+      <td>
+        <p>Set the slot sharing group of an operation. Flink will put operations with the same
+        slot sharing group into the same slot while keeping operations that don't have the
+        slot sharing group in other slots. This can be used to isolate slots. The slot sharing
+        group is inherited from input operations if all input operations are in the same slot
+        sharing group.
+        The name of the default slot sharing group is "default", operations can explicitly
+        be put into this group by calling slotSharingGroup("default").
+{% highlight java %}
+someStream.filter(...).slotSharingGroup("name");
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+	mappers will be chained, and filter will not be chained to
+	the first mapper.
+{% highlight scala %}
+someStream.filter(...).map(...).startNewChain().map(...)
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight scala %}
+someStream.map(...).disableChaining()
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  <tr>
+      <td>Set slot sharing group</td>
+      <td>
+        <p>Set the slot sharing group of an operation. Flink will put operations with the same
+        slot sharing group into the same slot while keeping operations that don't have the
+        slot sharing group in other slots. This can be used to isolate slots. The slot sharing
+        group is inherited from input operations if all input operations are in the same slot
+        sharing group.
+        The name of the default slot sharing group is "default", operations can explicitly
+        be put into this group by calling slotSharingGroup("default").
+{% highlight java %}
+someStream.filter(...).slotSharingGroup("name")
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+{% top %}
+
+Data Sources
+------------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+Sources are where your program reads its input from. You can attach a source to your program by
+using `StreamExecutionEnvironment.addSource(sourceFunction)`. Flink comes with a number of pre-implemented
+source functions, but you can always write your own custom sources by implementing the `SourceFunction`
+for non-parallel sources, or by implementing the `ParallelSourceFunction` interface or extending the
+`RichParallelSourceFunction` for parallel sources.
+
+There are several predefined stream sources accessible from the `StreamExecutionEnvironment`:
+
+File-based:
+
+- `readTextFile(path)` - Reads text files, i.e. files that respect the `TextInputFormat` specification, line-by-line and returns them as Strings.
+
+- `readFile(fileInputFormat, path)` - Reads (once) files as dictated by the specified file input format.
+
+- `readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)` -  This is the method called internally by the two previous ones. It reads files in the `path` based on the given `fileInputFormat`. Depending on the provided `watchType`, this source may periodically monitor (every `interval` ms) the path for new data (`FileProcessingMode.PROCESS_CONTINUOUSLY`), or process once the data currently in the path and exit (`FileProcessingMode.PROCESS_ONCE`). Using the `pathFilter`, the user can further exclude files from being processed.
+
+    *IMPLEMENTATION:*
+
+    Under the hood, Flink splits the file reading process into two sub-tasks, namely *directory monitoring* and *data reading*. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, **non-parallel** (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the `watchType`), find the files to be processed, divide them in *splits*, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read muplitple splits, one-by-one.
+
+    *IMPORTANT NOTES:*
+
+    1. If the `watchType` is set to `FileProcessingMode.PROCESS_CONTINUOUSLY`, when a file is modified, its contents are re-processed entirely. This can brake the "exactly-once" semantics, as appending data at the end of a file will lead to **all** its contents being re-processed.
+
+    2. If the `watchType` is set to `FileProcessingMode.PROCESS_ONCE`, the source scans the path **once** and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.
+
+Socket-based:
+
+- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter.
+
+Collection-based:
+
+- `fromCollection(Collection)` - Creates a data stream from the Java Java.util.Collection. All elements
+  in the collection must be of the same type.
+
+- `fromCollection(Iterator, Class)` - Creates a data stream from an iterator. The class specifies the
+  data type of the elements returned by the iterator.
+
+- `fromElements(T ...)` - Creates a data stream from the given sequence of objects. All objects must be
+  of the same type.
+
+- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the iterator.
+
+- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
+  parallel.
+
+Custom:
+
+- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use
+    `addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ site.baseurl }}/dev/connectors/index.html) for more details.
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+Sources are where your program reads its input from. You can attach a source to your program by
+using `StreamExecutionEnvironment.addSource(sourceFunction)`. Flink comes with a number of pre-implemented
+source functions, but you can always write your own custom sources by implementing the `SourceFunction`
+for non-parallel sources, or by implementing the `ParallelSourceFunction` interface or extending the
+`RichParallelSourceFunction` for parallel sources.
+
+There are several predefined stream sources accessible from the `StreamExecutionEnvironment`:
+
+File-based:
+
+- `readTextFile(path)` - Reads text files, i.e. files that respect the `TextInputFormat` specification, line-by-line and returns them as Strings.
+
+- `readFile(fileInputFormat, path)` - Reads (once) files as dictated by the specified file input format.
+
+- `readFile(fileInputFormat, path, watchType, interval, pathFilter)` -  This is the method called internally by the two previous ones. It reads files in the `path` based on the given `fileInputFormat`. Depending on the provided `watchType`, this source may periodically monitor (every `interval` ms) the path for new data (`FileProcessingMode.PROCESS_CONTINUOUSLY`), or process once the data currently in the path and exit (`FileProcessingMode.PROCESS_ONCE`). Using the `pathFilter`, the user can further exclude files from being processed.
+
+    *IMPLEMENTATION:*
+
+    Under the hood, Flink splits the file reading process into two sub-tasks, namely *directory monitoring* and *data reading*. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, **non-parallel** (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the `watchType`), find the files to be processed, divide them in *splits*, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read muplitple splits, one-by-one.
+
+    *IMPORTANT NOTES:*
+
+    1. If the `watchType` is set to `FileProcessingMode.PROCESS_CONTINUOUSLY`, when a file is modified, its contents are re-processed entirely. This can brake the "exactly-once" semantics, as appending data at the end of a file will lead to **all** its contents being re-processed.
+
+    2. If the `watchType` is set to `FileProcessingMode.PROCESS_ONCE`, the source scans the path **once** and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.
+
+Socket-based:
+
+- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter.
+
+Collection-based:
+
+- `fromCollection(Seq)` - Creates a data stream from the Java Java.util.Collection. All elements
+  in the collection must be of the same type.
+
+- `fromCollection(Iterator)` - Creates a data stream from an iterator. The class specifies the
+  data type of the elements returned by the iterator.
+
+- `fromElements(elements: _*)` - Creates a data stream from the given sequence of objects. All objects must be
+  of the same type.
+
+- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the iterator.
+
+- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
+  parallel.
+
+Custom:
+
+- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use
+    `addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.
+
+</div>
+</div>
+
+{% top %}
+
+Data Sinks
+----------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them.
+Flink comes with a variety of built-in output formats that are encapsulated behind operations on the
+DataStreams:
+
+- `writeAsText()` / `TextOutputFormat` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *toString()* method of each element.
+
+- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field
+  delimiters are configurable. The value for each field comes from the *toString()* method of the objects.
+
+- `print()` / `printToErr()`  - Prints the *toString()* value
+of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is
+prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is
+greater than 1, the output will also be prepended with the identifier of the task which produced the output.
+
+- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
+  custom object-to-bytes conversion.
+
+- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema`
+
+- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as
+    Apache Kafka) that are implemented as sink functions.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+<br />
+
+Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them.
+Flink comes with a variety of built-in output formats that are encapsulated behind operations on the
+DataStreams:
+
+- `writeAsText()` / `TextOutputFormat` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *toString()* method of each element.
+
+- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field
+  delimiters are configurable. The value for each field comes from the *toString()* method of the objects.
+
+- `print()` / `printToErr()`  - Prints the *toString()* value
+of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is
+prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is
+greater than 1, the output will also be prepended with the identifier of the task which produced the output.
+
+- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
+  custom object-to-bytes conversion.
+
+- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema`
+
+- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as
+    Apache Kafka) that are implemented as sink functions.
+
+</div>
+</div>
+
+Note that the `write*()` methods on `DataStream` are mainly intended for debugging purposes.
+They are not participating in Flink's checkpointing, this means these functions usually have
+at-least-once semantics. The data flushing to the target system depends on the implementation of the
+OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up
+in the target system. Also, in failure cases, those records might be lost.
+
+For reliable, exactly-once delivery of a stream into a file system, use the `flink-connector-filesystem`.
+Also, custom implementations through the `.addSink(...)` method can participate in Flink's checkpointing
+for exactly-once semantics.
+
+{% top %}
+
+Iterations
+----------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+Iterative streaming programs implement a step function and embed it into an `IterativeStream`. As a DataStream
+program may never finish, there is no maximum number of iterations. Instead, you need to specify which part
+of the stream is fed back to the iteration and which part is forwarded downstream using a `split` transformation
+or a `filter`. Here, we show an example using filters. First, we define an `IterativeStream`
+
+{% highlight java %}
+IterativeStream<Integer> iteration = input.iterate();
+{% endhighlight %}
+
+Then, we specify the logic that will be executed inside the loop using a series of transformations (here
+a simple `map` transformation)
+
+{% highlight java %}
+DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
+{% endhighlight %}
+
+To close an iteration and define the iteration tail, call the `closeWith(feedbackStream)` method of the `IterativeStream`.
+The DataStream given to the `closeWith` function will be fed back to the iteration head.
+A common pattern is to use a filter to separate the part of the stream that is fed back,
+and the part of the stream which is propagated forward. These filters can, e.g., define
+the "termination" logic, where an element is allowed to propagate downstream rather
+than being fed back.
+
+{% highlight java %}
+iteration.closeWith(iterationBody.filter(/* one part of the stream */));
+DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
+{% endhighlight %}
+
+By default the partitioning of the feedback stream will be automatically set to be the same as the input of the
+iteration head. To override this the user can set an optional boolean flag in the `closeWith` method.
+
+For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
+
+{% highlight java %}
+DataStream<Long> someIntegers = env.generateSequence(0, 1000);
+
+IterativeStream<Long> iteration = someIntegers.iterate();
+
+DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
+  @Override
+  public Long map(Long value) throws Exception {
+    return value - 1 ;
+  }
+});
+
+DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
+  @Override
+  public boolean filter(Long value) throws Exception {
+    return (value > 0);
+  }
+});
+
+iteration.closeWith(stillGreaterThanZero);
+
+DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
+  @Override
+  public boolean filter(Long value) throws Exception {
+    return (value <= 0);
+  }
+});
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+
+<br />
+
+Iterative streaming programs implement a step function and embed it into an `IterativeStream`. As a DataStream
+program may never finish, there is no maximum number of iterations. Instead, you need to specify which part
+of the stream is fed back to the iteration and which part is forwarded downstream using a `split` transformation
+or a `filter`. Here, we show an example iteration where the body (the part of the computation that is repeated)
+is a simple map transformation, and the elements that are fed back are distinguished by the elements that
+are forwarded downstream using filters.
+
+{% highlight scala %}
+val iteratedStream = someDataStream.iterate(
+  iteration => {
+    val iterationBody = iteration.map(/* this is executed many times */)
+    (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
+})
+{% endhighlight %}
+
+
+By default the partitioning of the feedback stream will be automatically set to be the same as the input of the
+iteration head. To override this the user can set an optional boolean flag in the `closeWith` method.
+
+For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
+
+{% highlight scala %}
+val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
+
+val iteratedStream = someIntegers.iterate(
+  iteration => {
+    val minusOne = iteration.map( v => v - 1)
+    val stillGreaterThanZero = minusOne.filter (_ > 0)
+    val lessThanZero = minusOne.filter(_ <= 0)
+    (stillGreaterThanZero, lessThanZero)
+  }
+)
+{% endhighlight %}
+
+</div>
+</div>
+
+{% top %}
+
+Execution Parameters
+--------------------
+
+The `StreamExecutionEnvironment` contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime.
+
+Please refer to [execution configuration]({{ site.baseurl }}/dev/api_concepts.html#execution-configuration)
+for an explanation of most parameters. These parameters pertain specifically to the DataStream API:
+
+- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted from a source.
+    `areTimestampsEnabled()` returns the current value.
+
+- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark emission. You can
+    get the current value with `long getAutoWatermarkInterval()`
+
+{% top %}
+
+### Fault Tolerance
+
+The [Fault Tolerance Documentation]({{ site.baseurl }}/setup/fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism.
+
+### Controlling Latency
+
+By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic)
+but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files.
+While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough.
+To control throughput and latency, you can use `env.setBufferTimeout(timeoutMillis)` on the execution environment
+(or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the
+buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.
+
+Usage:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+env.setBufferTimeout(timeoutMillis);
+
+env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
+env.setBufferTimeout(timeoutMillis)
+
+env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
+{% endhighlight %}
+</div>
+</div>
+
+To maximize throughput, set `setBufferTimeout(-1)` which will remove the timeout and buffers will only be
+flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms).
+A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.
+
+{% top %}
+
+Debugging
+---------
+
+Before running a streaming program in a distributed cluster, it is a good
+idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis
+programs is usually an incremental process of checking results, debugging, and improving.
+
+Flink provides features to significantly ease the development process of data analysis
+programs by supporting local debugging from within an IDE, injection of test data, and collection of
+result data. This section give some hints how to ease the development of Flink programs.
+
+### Local Execution Environment
+
+A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created in. If you
+start the LocalEnvironment from an IDE, you can set breakpoints in your code and easily debug your
+program.
+
+A LocalEnvironment is created and used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+DataStream<String> lines = env.addSource(/* some source */);
+// build your program
+
+env.execute();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.createLocalEnvironment()
+
+val lines = env.addSource(/* some source */)
+// build your program
+
+env.execute()
+{% endhighlight %}
+</div>
+</div>
+
+### Collection Data Sources
+
+Flink provides special data sources which are backed
+by Java collections to ease testing. Once a program has been tested, the sources and sinks can be
+easily replaced by sources and sinks that read from / write to external systems.
+
+Collection data sources can be used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+// Create a DataStream from a list of elements
+DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
+
+// Create a DataStream from any Java collection
+List<Tuple2<String, Integer>> data = ...
+DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
+
+// Create a DataStream from an Iterator
+Iterator<Long> longIt = ...
+DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.createLocalEnvironment()
+
+// Create a DataStream from a list of elements
+val myInts = env.fromElements(1, 2, 3, 4, 5)
+
+// Create a DataStream from any Collection
+val data: Seq[(String, Int)] = ...
+val myTuples = env.fromCollection(data)
+
+// Create a DataStream from an Iterator
+val longIt: Iterator[Long] = ...
+val myLongs = env.fromCollection(longIt)
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** Currently, the collection data source requires that data types and iterators implement
+`Serializable`. Furthermore, collection data sources can not be executed in parallel (
+parallelism = 1).
+
+### Iterator Data Sink
+
+Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.contrib.streaming.DataStreamUtils
+
+DataStream<Tuple2<String, Integer>> myResult = ...
+Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+import org.apache.flink.contrib.streaming.DataStreamUtils
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+val myResult: DataStream[(String, Int)] = ...
+val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/event_time.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md
new file mode 100644
index 0000000..7375a0f
--- /dev/null
+++ b/docs/dev/event_time.md
@@ -0,0 +1,206 @@
+---
+title: "Event Time"
+nav-id: event_time
+nav-show_overview: true
+nav-parent_id: dev
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* toc
+{:toc}
+
+# Event Time / Processing Time / Ingestion Time
+
+Flink supports different notions of *time* in streaming programs.
+
+- **Processing time:** Processing time refers to the system time of the machine that is executing the
+    respective operation.
+
+    When a streaming program runs on processing time, all time-based operations (like time windows) will
+    use the system clock of the machines that run the respective operator. For example, an hourly
+    processing time window will include all records that arrived at a specific operator between the
+    times when the system clock indicated the full hour.
+
+    Processing time is the simplest notion of time and requires no coordination between streams and machines.
+    It provides the best performance and the lowest latency. However, in distributed and asynchronous
+    environments processing time does not provide determinism, because it is susceptible to the speed at which
+    records arrive in the system (for example from the message queue), and to the speed at which the
+    records flow between operators inside the system.
+
+- **Event time:** Event time is the time that each individual event occurred on its producing device.
+    This time is typically embedded within the records before they enter Flink and that *event timestamp*
+    can be extracted from the record. An hourly event time window will contain all records that carry an
+    event timestamp that falls into that hour, regardless of when the records arrive, and in what order
+    they arrive.
+
+    Event time gives correct results even on out-of-order events, late events, or on replays
+    of data from backups or persistent logs. In event time, the progress of time depends on the data,
+    not on any wall clocks. Event time programs must specify how to generate *Event Time Watermarks*,
+    which is the mechanism that signals time progress in event time. The mechanism is
+    described below.
+
+    Event time processing often incurs a certain latency, due to it nature of waiting a certain time for
+    late events and out-of-order events. Because of that, event time programs are often combined with
+    *processing time* operations.
+
+- **Ingestion time:** Ingestion time is the time that events enter Flink. At the source operator, each
+    record gets the source's current time as a timestamp, and time-based operations (like time windows)
+    refer to that timestamp.
+
+    *Ingestion Time* sits conceptually in between *Event Time* and *Processing Time*. Compared to
+    *Processing Time*, it is slightly more expensive, but gives more predictable results: Because
+    *Ingestion Time* uses stable timestamps (assigned once at the source), different window operations
+    over the records will refer to the same timestamp, whereas in *Processing Time* each window operator
+    may assign the record to a different window (based on the local system clock and any transport delay).
+
+    Compered to *Event Time*, *Ingestion Time* programs cannot handle any out-of-order events or late data,
+    but the programs don't have to specify how to generate *Watermarks*.
+
+    Internally, *Ingestion Time* is treated much like event time, with automatic timestamp assignment and
+    automatic Watermark generation.
+
+<img src="{{ site.baseurl }}/fig/times_clocks.svg" class="center" width="80%" />
+
+
+### Setting a Time Characteristic
+
+The first part of a Flink DataStream program is usually to set the base *time characteristic*. That setting
+defines how data stream sources behave (for example whether to assign timestamps), and what notion of
+time the window operations like `KeyedStream.timeWindow(Time.seconds(30))` refer to.
+
+The following example shows a Flink program that aggregates events in hourly time windows. The behavior of the
+windows adapts with the time characteristic.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+// alternatively:
+// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
+
+stream
+    .keyBy( (event) -> event.getUser() )
+    .timeWindow(Time.hours(1))
+    .reduce( (a, b) -> a.add(b) )
+    .addSink(...);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+// alternatively:
+// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
+
+stream
+    .keyBy( _.getUser )
+    .timeWindow(Time.hours(1))
+    .reduce( (a, b) => a.add(b) )
+    .addSink(...)
+{% endhighlight %}
+</div>
+</div>
+
+
+Note that in order to run this example in *Event Time*, the program needs to use either an event time
+source, or inject a *Timestamp Assigner & Watermark Generator*. Those functions describe how to access
+the event timestamps, and what timely out-of-orderness the event stream exhibits.
+
+The section below describes the general mechanism behind *Timestamps* and *Watermarks*. For a guide on how
+to use timestamp assignment and watermark generation in the Flink DataStream API, please refer to
+[Generating Timestamps / Watermarks]({{ site.baseurl }}/dev/event_timestamps_watermarks.html)
+
+
+# Event Time and Watermarks
+
+*Note: Flink implements many techniques from the Dataflow Model. For a good introduction to Event Time and, have also a look at these articles*
+
+  - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau
+  - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf)
+
+
+A stream processor that supports *event time* needs a way to measure the progress of event time.
+For example, a window operator that builds hourly windows needs to be notified when event time has reached the
+next full hour, such that the operator can close the next window.
+
+*Event Time* can progress independently of *Processing Time* (measures by wall clocks).
+For example, in one program, the current *event time* of an operator can trail slightly behind the processing time
+(accounting for a delay in receiving the latest elements) and both proceed at the same speed. In another streaming
+program, which reads fast-forward through some data already buffered in a Kafka topic (or another message queue), event time
+can progress by weeks in seconds.
+
+------
+
+The mechanism in Flink to measure progress in event time is **Watermarks**.
+Watermarks flow as part of the data stream and carry a timestamp *t*. A *Watermark(t)* declares that event time has reached time
+*t* in that stream, meaning that all events with a timestamps *t' < t* have occurred.
+
+The figure below shows a stream of events with (logical) timestamps, and watermarks flowing inline. The events are in order
+(with respect to their timestamp), meaning that watermarks are simply periodic markers in the stream with an in-order timestamp.
+
+<img src="{{ site.baseurl }}/fig/stream_watermark_in_order.svg" alt="A data stream with events (in order) and watermarks" class="center" width="65%" />
+
+Watermarks are crucial for *out-of-order* streams, as shown in the figure below, where, events do not occur ordered by their timestamp.
+Watermarks establish points in the stream where all events up to a certain timestamp have occurred. Once these watermarks reach an
+operator, the operator can advance its internal *event time clock* to the value of the watermark.
+
+<img src="{{ site.baseurl }}/fig/stream_watermark_out_of_order.svg" alt="A data stream with events (out of order) and watermarks" class="center" width="65%" />
+
+
+## Watermarks in Parallel Streams
+
+Watermarks are generated at source functions, or directly after source functions. Each parallel subtask of a source function usually
+generates its watermarks independently. These watermarks define the event time at that particular parallel source.
+
+As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an
+operator advances its event time, it generates a new watermark downstream for its successor operators.
+
+Operators that consume multiple input streams (e.g., after a *keyBy(...)* or *partition(...)* function, or a union) track the event time
+on each of their input streams. The operator's current event time is the minimum of the input streams' event time. As the input streams
+update their event time, so does the operator.
+
+The figure below shows an example of events and watermarks flowing through parallel streams, and operators tracking event time.
+
+<img src="{{ site.baseurl }}/fig/parallel_streams_watermarks.svg" alt="Parallel data streams and operators with events and watermarks" class="center" width="80%" />
+
+
+## Late Elements
+
+It is possible that certain elements violate the watermark condition, meaning that even after the *Watermark(t)* has occurred,
+more elements with timestamp *t' < t* will occur. In fact, in many real world setups, certain elements can be arbitrarily
+delayed, making it impossible to define a time when all elements of a certain event timestamp have occurred.
+Further more, even if the lateness can be bounded, delaying the watermarks by too much is often not desirable, because it delays
+the evaluation of the event time windows by too much.
+
+Due to that, some streaming programs will explicitly expect a number of *late* elements. Late elements are elements that
+arrive after the system's event time clock (as signaled by the watermarks) has already passed the time of the late element's
+timestamp.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/event_timestamp_extractors.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamp_extractors.md b/docs/dev/event_timestamp_extractors.md
new file mode 100644
index 0000000..a9ec6e5
--- /dev/null
+++ b/docs/dev/event_timestamp_extractors.md
@@ -0,0 +1,106 @@
+---
+title: "Pre-defined Timestamp Extractors / Watermark Emitters"
+nav-parent_id: event_time
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* toc
+{:toc}
+
+As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html),
+Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,
+one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
+on their use-case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
+the incoming records, e.g. whenever a special element is encountered in the stream.
+
+In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
+This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
+for custom assigner implementations.
+
+#### **Assigner with Ascending Timestamps**
+
+The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
+occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will
+arrive.
+
+Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if
+in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
+timestamps are ascending within each Kafka partition. Flink's Watermark merging mechanism will generate correct
+watermarks whenever parallel streams are shuffled, unioned, connected, or merged.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<MyEvent> stream = ...
+
+DataStream<MyEvent> withTimestampsAndWatermarks =
+    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
+
+        @Override
+        public long extractAscendingTimestamp(MyEvent element) {
+            return element.getCreationTime();
+        }
+});
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream: DataStream[MyEvent] = ...
+
+val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
+{% endhighlight %}
+</div>
+</div>
+
+#### **Assigner which allows a fixed amount of record lateness**
+
+Another example of periodic watermark generation is when the watermark lags behind the maximum (event-time) timestamp
+seen in the stream by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in a
+stream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period of
+time for testing. For these cases, Flink provides the `BoundedOutOfOrdernessTimestampExtractor` which takes as an argument
+the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed to be late before being ignored when computing the
+final result for the given window. Lateness corresponds to the result of `t - t_w`, where `t` is the (event-time) timestamp of an
+element, and `t_w` that of the previous watermark. If `lateness > 0` then the element is considered late and is ignored when computing
+the result of the job for its corresponding window.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<MyEvent> stream = ...
+
+DataStream<MyEvent> withTimestampsAndWatermarks =
+    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
+
+        @Override
+        public long extractAscendingTimestamp(MyEvent element) {
+            return element.getCreationTime();
+        }
+});
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream: DataStream[MyEvent] = ...
+
+val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
+{% endhighlight %}
+</div>
+</div>


Mime
View raw message