Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 65FA618D22 for ; Fri, 15 Jan 2016 15:49:53 +0000 (UTC) Received: (qmail 42070 invoked by uid 500); 15 Jan 2016 15:49:53 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 41946 invoked by uid 500); 15 Jan 2016 15:49:53 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 41787 invoked by uid 99); 15 Jan 2016 15:49:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jan 2016 15:49:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04529E07D9; Fri, 15 Jan 2016 15:49:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: uce@apache.org To: commits@flink.apache.org Date: Fri, 15 Jan 2016 15:49:54 -0000 Message-Id: <4c2e60fd371045788f186874c86d7e8d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/15] flink git commit: [FLINK-3132] [docs] Initial docs restructure http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md deleted file mode 100644 index eac464f..0000000 --- a/docs/apis/streaming_guide.md +++ /dev/null @@ -1,4038 +0,0 @@ ---- -title: "Flink DataStream API Programming Guide" -is_beta: false ---- - - - - -DataStream programs in Flink are regular programs that implement transformations on data streams -(e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various -sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for -example write the data to files, or to standard output (for example the command line -terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. -The execution can happen in a local JVM, or on clusters of many machines. - -In order to create your own Flink DataStream program, we encourage you to start with the -[program skeleton](#program-skeleton) and gradually add your own -[transformations](#transformations). The remaining sections act as references for additional -operations and advanced features. - - -* This will be replaced by the TOC -{:toc} - - -Example Program ---------------- - -The following program is a complete, working example of streaming window word count application, that counts the -words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally. - -
-
- -{% highlight java %} -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Collector; - -public class WindowWordCount { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream> dataStream = env - .socketTextStream("localhost", 9999) - .flatMap(new Splitter()) - .keyBy(0) - .timeWindow(Time.of(5, TimeUnit.SECONDS)) - .sum(1); - - dataStream.print(); - - env.execute("Window WordCount"); - } - - public static class Splitter implements FlatMapFunction> { - @Override - public void flatMap(String sentence, Collector> out) throws Exception { - for (String word: sentence.split(" ")) { - out.collect(new Tuple2(word, 1)); - } - } - } - -} -{% endhighlight %} - -
- -
-{% highlight scala %} -import java.util.concurrent.TimeUnit - -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.windowing.time.Time - -object WindowWordCount { - def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.socketTextStream("localhost", 9999) - - val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } - .map { (_, 1) } - .keyBy(0) - .timeWindow(Time.of(5, TimeUnit.SECONDS)) - .sum(1) - - counts.print - - env.execute("Window Stream WordCount") - } -} -{% endhighlight %} -
- -
- -To run the example program, start the input stream with netcat first from a terminal: - -~~~bash -nc -lk 9999 -~~~ - -Just type some words hitting return for a new word. These will be the input to the -word count program. If you want to see counts greater than 1, type the same word again and again within -5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺). - -[Back to top](#top) - - -Linking with Flink ------------------- - -To write programs with Flink, you need to include the Flink DataStream library corresponding to -your programming language in your project. - -The simplest way to do this is to use one of the quickstart scripts: either for -[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They -create a blank project from a template (a Maven Archetype), which sets up everything for you. To -manually create the project, you can use the archetype and create a project by calling: - -
-
-{% highlight bash %} -mvn archetype:generate / - -DarchetypeGroupId=org.apache.flink/ - -DarchetypeArtifactId=flink-quickstart-java / - -DarchetypeVersion={{site.version }} -{% endhighlight %} -
-
-{% highlight bash %} -mvn archetype:generate / - -DarchetypeGroupId=org.apache.flink/ - -DarchetypeArtifactId=flink-quickstart-scala / - -DarchetypeVersion={{site.version }} -{% endhighlight %} -
-
- -The archetypes are working for stable releases and preview versions (`-SNAPSHOT`). - -If you want to add Flink to an existing Maven project, add the following entry to your -*dependencies* section in the *pom.xml* file of your project: - -
-
-{% highlight xml %} - - org.apache.flink - flink-streaming-java - {{site.version }} - - - org.apache.flink - flink-clients - {{site.version }} - -{% endhighlight %} -
-
-{% highlight xml %} - - org.apache.flink - flink-streaming-scala - {{site.version }} - - - org.apache.flink - flink-clients - {{site.version }} - -{% endhighlight %} -
-
- -In order to create your own Flink program, we encourage you to start with the -[program skeleton](#program-skeleton) and gradually add your own -[transformations](#transformations). - -[Back to top](#top) - -Program Skeleton ----------------- - -
-
- -
- -As presented in the [example](#example-program), Flink DataStream programs look like regular Java -programs with a `main()` method. Each program consists of the same basic parts: - -1. Obtaining a `StreamExecutionEnvironment`, -2. Connecting to data stream sources, -3. Specifying transformations on the data streams, -4. Specifying output for the processed data, -5. Executing the program. - -We will now give an overview of each of those steps, please refer to the respective sections for -more details. - -The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can -obtain one using these static methods on class `StreamExecutionEnvironment`: - -{% highlight java %} -getExecutionEnvironment() - -createLocalEnvironment() -createLocalEnvironment(int parallelism) -createLocalEnvironment(int parallelism, Configuration customConfiguration) - -createRemoteEnvironment(String host, int port, String... jarFiles) -createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) -{% endhighlight %} - -Typically, you only need to use `getExecutionEnvironment()`, since this -will do the right thing depending on the context: if you are executing -your program inside an IDE or as a regular Java program it will create -a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the [command line](cli.html) -or the [web interface](web_client.html), -the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return -an execution environment for executing your program on a cluster. - -For specifying data sources the execution environment has several methods -to read from files, sockets, and external systems using various methods. To just read -data from a socket (useful also for debugging), you can use: - -{% highlight java %} -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - -DataStream lines = env.socketTextStream("localhost", 9999) -{% endhighlight %} - -This will give you a DataStream on which you can then apply transformations. For -more information on data sources and input formats, please refer to -[Data Sources](#data-sources). - -Once you have a DataStream you can apply transformations to create a new -DataStream which you can then write to a socket, transform again, -combine with other DataStreams, or push to an external system (e.g., a message queue, or a file system). -You apply transformations by calling -methods on DataStream with your own custom transformation functions. For example, -a map transformation looks like this: - -{% highlight java %} -DataStream input = ...; - -DataStream intValues = input.map(new MapFunction() { - @Override - public Integer map(String value) { - return Integer.parseInt(value); - } -}); -{% endhighlight %} - -This will create a new DataStream by converting every String in the original -stream to an Integer. For more information and a list of all the transformations, -please refer to [Transformations](#transformations). - -Once you have a DataStream containing your final results, you can push the result -to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, -or print it. - -{% highlight java %} -writeAsText(String path, ...) -writeAsCsv(String path, ...) -writeToSocket(String hostname, int port, ...) - -print() - -addSink(...) -{% endhighlight %} - -Once you specified the complete program you need to **trigger the program execution** by -calling `execute()` on `StreamExecutionEnvironment`. This will either execute on -the local machine or submit the program for execution on a cluster, depending on the chosen execution environment. - -{% highlight java %} -env.execute(); -{% endhighlight %} - -
-
- -
- -As presented in the [example](#example-program), Flink DataStream programs look like regular Scala -programs with a `main()` method. Each program consists of the same basic parts: - -1. Obtaining a `StreamExecutionEnvironment`, -2. Connecting to data stream sources, -3. Specifying transformations on the data streams, -4. Specifying output for the processed data, -5. Executing the program. - -We will now give an overview of each of those steps, please refer to the respective sections for -more details. - -The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can -obtain one using these static methods on class `StreamExecutionEnvironment`: - -{% highlight scala %} -def getExecutionEnvironment - -def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()) - -def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) -def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*) -{% endhighlight %} - -Typically, you only need to use `getExecutionEnvironment`, since this -will do the right thing depending on the context: if you are executing -your program inside an IDE or as a regular Java program it will create -a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the [command line](cli.html) -or the [web interface](web_client.html), -the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return -an execution environment for executing your program on a cluster. - -For specifying data sources the execution environment has several methods -to read from files, sockets, and external systems using various methods. To just read -data from a socket (useful also for debugging), you can use: - -{% highlight scala %} -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment - -DataStream lines = env.socketTextStream("localhost", 9999) -{% endhighlight %} - -This will give you a DataStream on which you can then apply transformations. For -more information on data sources and input formats, please refer to -[Data Sources](#data-sources). - -Once you have a DataStream you can apply transformations to create a new -DataStream which you can then write to a file, transform again, -combine with other DataStreams, or push to an external system. -You apply transformations by calling -methods on DataStream with your own custom transformation function. For example, -a map transformation looks like this: - -{% highlight scala %} -val input: DataStream[String] = ... - -val mapped = input.map { x => x.toInt } -{% endhighlight %} - -This will create a new DataStream by converting every String in the original -set to an Integer. For more information and a list of all the transformations, -please refer to [Transformations](#transformations). - -Once you have a DataStream containing your final results, you can push the result -to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, -or print it. - -{% highlight scala %} -writeAsText(path: String, ...) -writeAsCsv(path: String, ...) -writeToSocket(hostname: String, port: Int, ...) - -print() - -addSink(...) -{% endhighlight %} - -Once you specified the complete program you need to **trigger the program execution** by -calling `execute` on `StreamExecutionEnvironment`. This will either execute on -the local machine or submit the program for execution on a cluster, depending on the chosen execution environment. - -{% highlight scala %} -env.execute() -{% endhighlight %} - -
-
- -[Back to top](#top) - -DataStream Abstraction ----------------------- - -A `DataStream` is a possibly unbounded immutable collection of data items of a the same type. - -Transformations may return different subtypes of `DataStream` allowing specialized transformations. -For example the `keyBy(…)` method returns a `KeyedDataStream` which is a stream of data that -is logically partitioned by a certain key, and can be further windowed. - -[Back to top](#top) - -Lazy Evaluation ---------------- - -All Flink DataStream programs are executed lazily: When the program's main method is executed, the data loading -and transformations do not happen directly. Rather, each operation is created and added to the -program's plan. The operations are actually executed when the execution is explicitly triggered by -an `execute()` call on the `StreamExecutionEnvironment` object. Whether the program is executed locally -or on a cluster depends on the type of `StreamExecutionEnvironment`. - -The lazy evaluation lets you construct sophisticated programs that Flink executes as one -holistically planned unit. - -[Back to top](#top) - - -Transformations ---------------- - -Data transformations transform one or more DataStreams into a new DataStream. Programs can combine -multiple transformations into sophisticated topologies. - -This section gives a description of all the available transformations. - - -
-
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Map
DataStream → DataStream
-

Takes one element and produces one element. A map function that doubles the values of the input stream:

- {% highlight java %} -DataStream dataStream = //... -dataStream.map(new MapFunction() { - @Override - public Integer map(Integer value) throws Exception { - return 2 * value; - } -}); - {% endhighlight %} -
FlatMap
DataStream → DataStream
-

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

- {% highlight java %} -dataStream.flatMap(new FlatMapFunction() { - @Override - public void flatMap(String value, Collector out) - throws Exception { - for(String word: value.split(" ")){ - out.collect(word); - } - } -}); - {% endhighlight %} -
Filter
DataStream → DataStream
-

Evaluates a boolean function for each element and retains those for which the function returns true. - A filter that filters out zero values: -

- {% highlight java %} -dataStream.filter(new FilterFunction() { - @Override - public boolean filter(Integer value) throws Exception { - return value != 0; - } -}); - {% endhighlight %} -
KeyBy
DataStream → KeyedStream
-

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. - Internally, this is implemented with hash partitioning. See keys on how to specify keys. - This transformation returns a KeyedDataStream.

- {% highlight java %} -dataStream.keyBy("someKey") // Key by field "someKey" -dataStream.keyBy(0) // Key by the first element of a Tuple - {% endhighlight %} -
Reduce
KeyedStream → DataStream
-

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and - emits the new value. -
-
- A reduce function that creates a stream of partial sums:

- {% highlight java %} -keyedStream.reduce(new ReduceFunction() { - @Override - public Integer reduce(Integer value1, Integer value2) - throws Exception { - return value1 + value2; - } -}); - {% endhighlight %} -

-
Fold
KeyedStream → DataStream
-

A "rolling" fold on a keyed data stream with an initial value. - Combines the current element with the last folded value and - emits the new value. -
-
-

A fold function that, when applied on the sequence (1,2,3,4,5), - emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

- {% highlight java %} -DataStream result = - keyedStream.fold("start", new FoldFunction() { - @Override - public String fold(String current, Integer value) { - return current + "-" + value; - } - }); - {% endhighlight %} -

-
Aggregations
KeyedStream → DataStream
-

Rolling aggregations on a keyed data stream. The difference between min - and minBy is that min returns the minimun value, whereas minBy returns - the element that has the minimum value in this field (same for max and maxBy).

- {% highlight java %} -keyedStream.sum(0); -keyedStream.sum("key"); -keyedStream.min(0); -keyedStream.min("key"); -keyedStream.max(0); -keyedStream.max("key"); -keyedStream.minBy(0); -keyedStream.minBy("key"); -keyedStream.maxBy(0); -keyedStream.maxBy("key"); - {% endhighlight %} -
Window
KeyedStream → WindowedStream
-

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each - key according to some characteristic (e.g., the data that arrived within the last 5 seconds). - See windows for a complete description of windows. - {% highlight java %} -dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data - {% endhighlight %} -

-
WindowAll
DataStream → AllWindowedStream
-

Windows can be defined on regular DataStreams. Windows group all the stream events - according to some characteristic (e.g., the data that arrived within the last 5 seconds). - See windows for a complete description of windows.

-

WARNING: This is in many cases a non-parallel transformation. All records will be - gathered in one task for the windowAll operator.

- {% highlight java %} -dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data - {% endhighlight %} -
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
-

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

-

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

- {% highlight java %} -windowedStream.apply (new WindowFunction, Integer, Tuple, Window>() { - public void apply (Tuple key, - Window window, - Iterable> values, - Collector out) throws Exception { - int sum = 0; - for (Tuple2 t: values) { - sum += t.f1; - } - out.collect (new Integer(sum)); - } -}); - -// applying an AllWindowFunction on non-keyed window stream -allWindowedStream.apply (new AllWindowFunction, Integer, Window>() { - public void apply (Window window, - Iterable> values, - Collector out) throws Exception { - int sum = 0; - for (Tuple2 t: values) { - sum += t.f1; - } - out.collect (new Integer(sum)); - } -}); - {% endhighlight %} -
Window Reduce
WindowedStream → DataStream
-

Applies a functional reduce function to the window and returns the reduced value.

- {% highlight java %} -windowedStream.reduce (new ReduceFunction() { - public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { - return new Tuple2(value1.f0, value1.f1 + value2.f1); - } -}; - {% endhighlight %} -
Window Fold
WindowedStream → DataStream
-

Applies a functional fold function to the window and returns the folded value. - The example function, when applied on the sequence (1,2,3,4,5), - folds the sequence into the string "start-1-2-3-4-5":

- {% highlight java %} -windowedStream.fold("start-", new FoldFunction() { - public String fold(String current, Integer value) { - return current + "-" + value; - } -}; - {% endhighlight %} -
Aggregations on windows
WindowedStream → DataStream
-

Aggregates the contents of a window. The difference between min - and minBy is that min returns the minimun value, whereas minBy returns - the element that has the minimum value in this field (same for max and maxBy).

- {% highlight java %} -windowedStream.sum(0); -windowedStream.sum("key"); -windowedStream.min(0); -windowedStream.min("key"); -windowedStream.max(0); -windowedStream.max("key"); -windowedStream.minBy(0); -windowedStream.minBy("key"); -windowedStream.maxBy(0); -windowedStream.maxBy("key"); - {% endhighlight %} -
Union
DataStream* → DataStream
-

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream - with itself you will get each element twice in the resulting stream.

- {% highlight java %} -dataStream.union(otherStream1, otherStream2, ...); - {% endhighlight %} -
Window Join
DataStream,DataStream → DataStream
-

Join two data streams on a given key and a common window.

- {% highlight java %} -dataStream.join(otherStream) - .where(0).equalTo(1) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) - .apply (new JoinFunction () {...}); - {% endhighlight %} -
Window CoGroup
DataStream,DataStream → DataStream
-

Cogroups two data streams on a given key and a common window.

- {% highlight java %} -dataStream.coGroup(otherStream) - .where(0).equalTo(1) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) - .apply (new CoGroupFunction () {...}); - {% endhighlight %} -
Connect
DataStream,DataStream → ConnectedStreams
-

"Connects" two data streams retaining their types. Connect allowing for shared state between - the two streams.

- {% highlight java %} -DataStream someStream = //... -DataStream otherStream = //... - -ConnectedStreams connectedStreams = someStream.connect(otherStream); - {% endhighlight %} -
CoMap, CoFlatMap
ConnectedStreams → DataStream
-

Similar to map and flatMap on a connected data stream

- {% highlight java %} -connectedStreams.map(new CoMapFunction() { - @Override - public Boolean map1(Integer value) { - return true; - } - - @Override - public Boolean map2(String value) { - return false; - } -}); -connectedStreams.flatMap(new CoFlatMapFunction() { - - @Override - public void flatMap1(Integer value, Collector out) { - out.collect(value.toString()); - } - - @Override - public void flatMap2(String value, Collector out) { - for (String word: value.split(" ")) { - out.collect(word); - } - } -}); - {% endhighlight %} -
Split
DataStream → SplitStream
-

- Split the stream into two or more streams according to some criterion. - {% highlight java %} -SplitStream split = someDataStream.split(new OutputSelector() { - @Override - public Iterable select(Integer value) { - List output = new ArrayList(); - if (value % 2 == 0) { - output.add("even"); - } - else { - output.add("odd"); - } - return output; - } -}); - {% endhighlight %} -

-
Select
SplitStream → DataStream
-

- Select one or more streams from a split stream. - {% highlight java %} -SplitStream split; -DataStream even = split.select("even"); -DataStream odd = split.select("odd"); -DataStream all = split.select("even","odd"); - {% endhighlight %} -

-
Iterate
DataStream → IterativeStream → DataStream
-

- Creates a "feedback" loop in the flow, by redirecting the output of one operator - to some previous operator. This is especially useful for defining algorithms that - continuously update a model. The following code starts with a stream and applies - the iteration body continuously. Elements that are greater than 0 are sent back - to the feedback channel, and the rest of the elements are forwarded downstream. - See iterations for a complete description. - {% highlight java %} -IterativeStream iteration = initialStream.iterate(); -DataStream iterationBody = iteration.map (/*do something*/); -DataStream feedback = iterationBody.filter(new FilterFunction(){ - @Override - public boolean filter(Integer value) throws Exception { - return value > 0; - } -}); -iteration.closeWith(feedback); -DataStream output = iterationBody.filter(new FilterFunction(){ - @Override - public boolean filter(Integer value) throws Exception { - return value <= 0; - } -}); - {% endhighlight %} -

-
Extract Timestamps
DataStream → DataStream
-

- Extracts timestamps from records in order to work with windows - that use event time semantics. See working with time. - {% highlight java %} -stream.assignTimestamps (new TimeStampExtractor() {...}); - {% endhighlight %} -

-
- -
- -
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Map
DataStream → DataStream
-

Takes one element and produces one element. A map function that doubles the values of the input stream:

- {% highlight scala %} -dataStream.map { x => x * 2 } - {% endhighlight %} -
FlatMap
DataStream → DataStream
-

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

- {% highlight scala %} -dataStream.flatMap { str => str.split(" ") } - {% endhighlight %} -
Filter
DataStream → DataStream
-

Evaluates a boolean function for each element and retains those for which the function returns true. - A filter that filters out zero values: -

- {% highlight scala %} -dataStream.filter { _ != 0 } - {% endhighlight %} -
KeyBy
DataStream → KeyedStream
-

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. - Internally, this is implemented with hash partitioning. See keys on how to specify keys. - This transformation returns a KeyedDataStream.

- {% highlight scala %} -dataStream.keyBy("someKey") // Key by field "someKey" -dataStream.keyBy(0) // Key by the first element of a Tuple - {% endhighlight %} -
Reduce
KeyedStream → DataStream
-

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and - emits the new value. -
-
- A reduce function that creates a stream of partial sums:

- {% highlight scala %} -keyedStream.reduce { _ + _ } - {% endhighlight %} -

-
Fold
KeyedStream → DataStream
-

A "rolling" fold on a keyed data stream with an initial value. - Combines the current element with the last folded value and - emits the new value. -
-
-

A fold function that, when applied on the sequence (1,2,3,4,5), - emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

- {% highlight scala %} -val result: DataStream[String] = - keyedStream.fold("start", (str, i) => { str + "-" + i }) - {% endhighlight %} -

-
Aggregations
KeyedStream → DataStream
-

Rolling aggregations on a keyed data stream. The difference between min - and minBy is that min returns the minimun value, whereas minBy returns - the element that has the minimum value in this field (same for max and maxBy).

- {% highlight scala %} -keyedStream.sum(0) -keyedStream.sum("key") -keyedStream.min(0) -keyedStream.min("key") -keyedStream.max(0) -keyedStream.max("key") -keyedStream.minBy(0) -keyedStream.minBy("key") -keyedStream.maxBy(0) -keyedStream.maxBy("key") - {% endhighlight %} -
Window
KeyedStream → WindowedStream
-

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each - key according to some characteristic (e.g., the data that arrived within the last 5 seconds). - See windows for a description of windows. - {% highlight scala %} -dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data - {% endhighlight %} -

-
WindowAll
DataStream → AllWindowedStream
-

Windows can be defined on regular DataStreams. Windows group all the stream events - according to some characteristic (e.g., the data that arrived within the last 5 seconds). - See windows for a complete description of windows.

-

WARNING: This is in many cases a non-parallel transformation. All records will be - gathered in one task for the windowAll operator.

- {% highlight scala %} -dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data - {% endhighlight %} -
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
-

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

-

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

- {% highlight scala %} -windowedStream.apply { WindowFunction } - -// applying an AllWindowFunction on non-keyed window stream -allWindowedStream.apply { AllWindowFunction } - - {% endhighlight %} -
Window Reduce
WindowedStream → DataStream
-

Applies a functional reduce function to the window and returns the reduced value.

- {% highlight scala %} -windowedStream.reduce { _ + _ } - {% endhighlight %} -
Window Fold
WindowedStream → DataStream
-

Applies a functional fold function to the window and returns the folded value. - The example function, when applied on the sequence (1,2,3,4,5), - folds the sequence into the string "start-1-2-3-4-5":

- {% highlight scala %} -val result: DataStream[String] = - windowedStream.fold("start", (str, i) => { str + "-" + i }) - {% endhighlight %} -
Aggregations on windows
WindowedStream → DataStream
-

Aggregates the contents of a window. The difference between min - and minBy is that min returns the minimun value, whereas minBy returns - the element that has the minimum value in this field (same for max and maxBy).

- {% highlight scala %} -windowedStream.sum(0) -windowedStream.sum("key") -windowedStream.min(0) -windowedStream.min("key") -windowedStream.max(0) -windowedStream.max("key") -windowedStream.minBy(0) -windowedStream.minBy("key") -windowedStream.maxBy(0) -windowedStream.maxBy("key") - {% endhighlight %} -
Union
DataStream* → DataStream
-

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream - with itself you will get each element twice in the resulting stream.

- {% highlight scala %} -dataStream.union(otherStream1, otherStream2, ...) - {% endhighlight %} -
Window Join
DataStream,DataStream → DataStream
-

Join two data streams on a given key and a common window.

- {% highlight scala %} -dataStream.join(otherStream) - .where(0).equalTo(1) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) - .apply { ... } - {% endhighlight %} -
Window CoGroup
DataStream,DataStream → DataStream
-

Cogroups two data streams on a given key and a common window.

- {% highlight scala %} -dataStream.coGroup(otherStream) - .where(0).equalTo(1) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) - .apply {} - {% endhighlight %} -
Connect
DataStream,DataStream → ConnectedStreams
-

"Connects" two data streams retaining their types, allowing for shared state between - the two streams.

- {% highlight scala %} -someStream : DataStream[Int] = ... -otherStream : DataStream[String] = ... - -val connectedStreams = someStream.connect(otherStream) - {% endhighlight %} -
CoMap, CoFlatMap
ConnectedStreams → DataStream
-

Similar to map and flatMap on a connected data stream

- {% highlight scala %} -connectedStreams.map( - (_ : Int) => true, - (_ : String) => false -) -connectedStreams.flatMap( - (_ : Int) => true, - (_ : String) => false -) - {% endhighlight %} -
Split
DataStream → SplitStream
-

- Split the stream into two or more streams according to some criterion. - {% highlight scala %} -val split = someDataStream.split( - (num: Int) => - (num % 2) match { - case 0 => List("even") - case 1 => List("odd") - } -) - {% endhighlight %} -

-
Select
SplitStream → DataStream
-

- Select one or more streams from a split stream. - {% highlight scala %} - -val even = split select "even" -val odd = split select "odd" -val all = split.select("even","odd") - {% endhighlight %} -

-
Iterate
DataStream → IterativeStream → DataStream
-

- Creates a "feedback" loop in the flow, by redirecting the output of one operator - to some previous operator. This is especially useful for defining algorithms that - continuously update a model. The following code starts with a stream and applies - the iteration body continuously. Elements that are greater than 0 are sent back - to the feedback channel, and the rest of the elements are forwarded downstream. - See iterations for a complete description. - {% highlight java %} -initialStream. iterate { - iteration => { - val iterationBody = iteration.map {/*do something*/} - (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) - } -} -IterativeStream iteration = initialStream.iterate(); -DataStream iterationBody = iteration.map (/*do something*/); -DataStream feedback = iterationBody.filter ( _ > 0); -iteration.closeWith(feedback); - {% endhighlight %} -

-
Extract Timestamps
DataStream → DataStream
-

- Extracts timestamps from records in order to work with windows - that use event time semantics. - See working with time. - {% highlight scala %} -stream.assignTimestamps { timestampExtractor } - {% endhighlight %} -

-
- -
-
- -The following transformations are available on data streams of Tuples: - - -
-
- -
- - - - - - - - - - - - - - -
TransformationDescription
Project
DataStream → DataStream
-

Selects a subset of fields from the tuples -{% highlight java %} -DataStream> in = // [...] -DataStream> out = in.project(2,0); -{% endhighlight %} -

-
- -
- -
- -
- - - - - - - - - - - - - - -
TransformationDescription
Project
DataStream → DataStream
-

Selects a subset of fields from the tuples -{% highlight scala %} -val in : DataStream[(Int,Double,String)] = // [...] -val out = in.project(2,0) -{% endhighlight %} -

-
- -
-
- - -### Physical partitioning - -Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, -via the following functions. - -
-
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Hash partitioning
DataStream → DataStream
-

- Identical to keyBy but returns a DataStream instead of a KeyedStream. - {% highlight java %} -dataStream.partitionByHash("someKey"); -dataStream.partitionByHash(0); - {% endhighlight %} -

-
Custom partitioning
DataStream → DataStream
-

- Uses a user-defined Partitioner to select the target task for each element. - {% highlight java %} -dataStream.partitionCustom(new Partitioner(){...}, "someKey"); -dataStream.partitionCustom(new Partitioner(){...}, 0); - {% endhighlight %} -

-
Random partitioning
DataStream → DataStream
-

- Partitions elements randomly according to a uniform distribution. - {% highlight java %} -dataStream.partitionRandom(); - {% endhighlight %} -

-
Rebalancing (Round-robin partitioning)
DataStream → DataStream
-

- Partitions elements round-robin, creating equal load per partition. Useful for performance - optimization in the presence of data skew. - {% highlight java %} -dataStream.rebalance(); - {% endhighlight %} -

-
Broadcasting
DataStream → DataStream
-

- Broadcasts elements to every partition. - {% highlight java %} -dataStream.broadcast(); - {% endhighlight %} -

-
- -
- -
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Hash partitioning
DataStream → DataStream
-

- Identical to keyBy but returns a DataStream instead of a KeyedStream. - {% highlight scala %} -dataStream.partitionByHash("someKey") -dataStream.partitionByHash(0) - {% endhighlight %} -

-
Custom partitioning
DataStream → DataStream
-

- Uses a user-defined Partitioner to select the target task for each element. - {% highlight scala %} -dataStream.partitionCustom(partitioner, "someKey") -dataStream.partitionCustom(partitioner, 0) - {% endhighlight %} -

-
Random partitioning
DataStream → DataStream
-

- Partitions elements randomly according to a uniform distribution. - {% highlight scala %} -dataStream.partitionRandom() - {% endhighlight %} -

-
Rebalancing (Round-robin partitioning)
DataStream → DataStream
-

- Partitions elements round-robin, creating equal load per partition. Useful for performance - optimization in the presence of data skew. - {% highlight scala %} -dataStream.rebalance() - {% endhighlight %} -

-
Broadcasting
DataStream → DataStream
-

- Broadcasts elements to every partition. - {% highlight scala %} -dataStream.broadcast() - {% endhighlight %} -

-
- -
-
- -### Task chaining and resource groups - -Chaining two subsequent transformations means co-locating them within the same thread for better -performance. Flink by default chains operators if this is possible (e.g., two subsequent map -transformations). The API gives fine-grained control over chaining if desired: - -Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to disable chaining in -the whole job. For more fine grained control, the following functions are available. Note that -these functions can only be used right after a DataStream transformation as they refer to the -previous transformation. For example, you can use `someStream.map(...).startNewChain()`, but -you cannot use `someStream.startNewChain()`. - -A resource group is a slot in Flink, see -[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots). You can -manually isolate operators in separate slots if desired. - -
-
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Start new chain -

Begin a new chain, starting with this operator. The two - mappers will be chained, and filter will not be chained to - the first mapper. -{% highlight java %} -someStream.filter(...).map(...).startNewChain().map(...); -{% endhighlight %} -

-
Disable chaining -

Do not chain the map operator -{% highlight java %} -someStream.map(...).disableChaining(); -{% endhighlight %} -

-
Start a new resource group -

Start a new resource group containing the filter and the subsequent operators. -{% highlight java %} -someStream.filter(...).startNewResourceGroup(); -{% endhighlight %} -

-
Isolate resources -

Isolate the operator in its own slot. -{% highlight java %} -someStream.map(...).isolateResources(); -{% endhighlight %} -

-
- -
- -
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Start new chain -

Begin a new chain, starting with this operator. The two - mappers will be chained, and filter will not be chained to - the first mapper. -{% highlight scala %} -someStream.filter(...).map(...).startNewChain().map(...) -{% endhighlight %} -

-
Disable chaining -

Do not chain the map operator -{% highlight scala %} -someStream.map(...).disableChaining() -{% endhighlight %} -

-
Start a new resource group -

Start a new resource group containing the map and the subsequent operators. -{% highlight scala %} -someStream.filter(...).startNewResourceGroup() -{% endhighlight %} -

-
Isolate resources -

Isolate the operator in its own slot. -{% highlight scala %} -someStream.map(...).isolateResources() -{% endhighlight %} -

-
- -
-
- - -[Back to top](#top) - -Specifying Keys ----------------- - -The `keyBy` transformation requires that a key is defined on -its argument DataStream. - -A DataStream is keyed as -{% highlight java %} -DataStream<...> input = // [...] -DataStream<...> windowed = input - .keyBy(/*define key here*/) - .window(/*define window here*/); -{% endhighlight %} - -The data model of Flink is not based on key-value pairs. Therefore, -you do not need to physically pack the data stream types into keys and -values. Keys are "virtual": they are defined as functions over the -actual data to guide the grouping operator. - -See [the relevant section of the DataSet API documentation](programming_guide.html#specifying-keys) on how to specify keys. -Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`. - - - -Passing Functions to Flink --------------------------- - -Some transformations take user-defined functions as arguments. - -See [the relevant section of the DataSet API documentation](programming_guide.html#passing-functions-to-flink). - - -[Back to top](#top) - - -Data Types ----------- - -Flink places some restrictions on the type of elements that are used in DataStreams and in results -of transformations. The reason for this is that the system analyzes the types to determine -efficient execution strategies. - -See [the relevant section of the DataSet API documentation](programming_guide.html#data-types). - -[Back to top](#top) - - -Data Sources ------------- - -
-
- -
- -Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`. -You can either use one of the source functions that come with Flink or write a custom source -by implementing the `SourceFunction` for non-parallel sources, or by implementing the -`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` for parallel sources. - -There are several predefined stream sources accessible from the `StreamExecutionEnvironment`: - -File-based: - -- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings. - -- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as - StringValues. StringValues are mutable strings. - -- `readFile(path)` / Any input format - Reads files as dictated by the input format. - -- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`. - -- `readFileStream` - create a stream by appending elements when there are changes to a file - -Socket-based: - -- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter. - -Collection-based: - -- `fromCollection(Collection)` - Creates a data stream from the Java Java.util.Collection. All elements - in the collection must be of the same type. - -- `fromCollection(Iterator, Class)` - Creates a data stream from an iterator. The class specifies the - data type of the elements returned by the iterator. - -- `fromElements(T ...)` - Creates a data stream from the given sequence of objects. All objects must be - of the same type. - -- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream from an iterator, in - parallel. The class specifies the data type of the elements returned by the iterator. - -- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in - parallel. - -Custom: - -- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use - `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors](#connectors) for more details. - -
- -
- -
- -Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`. -You can either use one of the source functions that come with Flink or write a custom source -by implementing the `SourceFunction` for non-parallel sources, or by implementing the -`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` for parallel sources. - -There are several predefined stream sources accessible from the `StreamExecutionEnvironment`: - -File-based: - -- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings. - -- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as - StringValues. StringValues are mutable strings. - -- `readFile(path)` / Any input format - Reads files as dictated by the input format. - -- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`. - -- `readFileStream` - create a stream by appending elements when there are changes to a file - -Socket-based: - -- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter. - -Collection-based: - -- `fromCollection(Seq)` - Creates a data stream from the Java Java.util.Collection. All elements - in the collection must be of the same type. - -- `fromCollection(Iterator)` - Creates a data stream from an iterator. The class specifies the - data type of the elements returned by the iterator. - -- `fromElements(elements: _*)` - Creates a data stream from the given sequence of objects. All objects must be - of the same type. - -- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an iterator, in - parallel. The class specifies the data type of the elements returned by the iterator. - -- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in - parallel. - -Custom: - -- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use - `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors](#connectors) for more details. - -
-
- -[Back to top](#top) - - -Execution Configuration ----------- - -The `StreamExecutionEnvironment` also contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime. - -See [the relevant section of the DataSet API documentation](programming_guide.html#execution-configuration). - -Parameters in the `ExecutionConfig` that pertain specifically to the DataStream API are: - -- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted from a source. - `areTimestampsEnabled()` returns the current value. - -- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark emission. You can - get the current value with `long getAutoWatermarkInterval()` - -[Back to top](#top) - -Data Sinks ----------- - -
-
- -
- -Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. -Flink comes with a variety of built-in output formats that are encapsulated behind operations on the -DataStreams: - -- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are - obtained by calling the *toString()* method of each element. - -- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field - delimiters are configurable. The value for each field comes from the *toString()* method of the objects. - -- `print()` / `printToErr()` - Prints the *toString()* value -of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is -prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is -greater than 1, the output will also be prepended with the identifier of the task which produced the output. - -- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports - custom object-to-bytes conversion. - -- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` - -- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as - Apache Kafka) that are implemented as sink functions. - -
-
- -
- -Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. -Flink comes with a variety of built-in output formats that are encapsulated behind operations on the -DataStreams: - -- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are - obtained by calling the *toString()* method of each element. - -- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field - delimiters are configurable. The value for each field comes from the *toString()* method of the objects. - -- `print()` / `printToErr()` - Prints the *toString()* value -of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is -prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is -greater than 1, the output will also be prepended with the identifier of the task which produced the output. - -- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports - custom object-to-bytes conversion. - -- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` - -- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as - Apache Kafka) that are implemented as sink functions. - -
-
- - -[Back to top](#top) - -Debugging ---------- - -Before running a streaming program in a distributed cluster, it is a good -idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis -programs is usually an incremental process of checking results, debugging, and improving. - -Flink provides features to significantly ease the development process of data analysis -programs by supporting local debugging from within an IDE, injection of test data, and collection of -result data. This section give some hints how to ease the development of Flink programs. - -### Local Execution Environment - -A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created in. If you -start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your -program. - -A LocalEnvironment is created and used as follows: - -
-
-{% highlight java %} -final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - -DataStream lines = env.addSource(/* some source */); -// build your program - -env.execute(); -{% endhighlight %} -
-
- -{% highlight scala %} -val env = StreamExecutionEnvironment.createLocalEnvironment() - -val lines = env.addSource(/* some source */) -// build your program - -env.execute() -{% endhighlight %} -
-
- -### Collection Data Sources - -Flink provides special data sources which are backed -by Java collections to ease testing. Once a program has been tested, the sources and sinks can be -easily replaced by sources and sinks that read from / write to external systems. - -Collection data sources can be used as follows: - -
-
-{% highlight java %} -final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - -// Create a DataStream from a list of elements -DataStream myInts = env.fromElements(1, 2, 3, 4, 5); - -// Create a DataStream from any Java collection -List> data = ... -DataStream> myTuples = env.fromCollection(data); - -// Create a DataStream from an Iterator -Iterator longIt = ... -DataStream myLongs = env.fromCollection(longIt, Long.class); -{% endhighlight %} -
-
-{% highlight scala %} -val env = StreamExecutionEnvironment.createLocalEnvironment() - -// Create a DataStream from a list of elements -val myInts = env.fromElements(1, 2, 3, 4, 5) - -// Create a DataStream from any Collection -val data: Seq[(String, Int)] = ... -val myTuples = env.fromCollection(data) - -// Create a DataStream from an Iterator -val longIt: Iterator[Long] = ... -val myLongs = env.fromCollection(longIt) -{% endhighlight %} -
-
- -**Note:** Currently, the collection data source requires that data types and iterators implement -`Serializable`. Furthermore, collection data sources can not be executed in parallel ( -parallelism = 1). - -### Iterator Data Sink - -Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows: - -
-
-{% highlight java %} -import org.apache.flink.contrib.streaming.DataStreamUtils - -DataStream> myResult = ... -Iterator> myOutput = DataStreamUtils.collect(myResult) -{% endhighlight %} - -
-
- -{% highlight scala %} -import org.apache.flink.contrib.streaming.DataStreamUtils -import scala.collection.JavaConverters.asScalaIteratorConverter - -val myResult: DataStream[(String, Int)] = ... -val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala -{% endhighlight %} -
-
- - -[Back to top](#top) - - -Windows -------- - -### Working with Time - -Windows are typically groups of events within a certain time period. Reasoning about time and windows assumes -a definition of time. Flink has support for three kinds of time: - -- *Processing time:* Processing time is simply the wall clock time of the machine that happens to be - executing the transformation. Processing time is the simplest notion of time and provides the best - performance. However, in distributed and asynchronous environments processing time does not provide - determinism. - -- *Event time:* Event time is the time that each individual event occurred. This time is - typically embedded within the records before they enter Flink or can be extracted from their contents. - When using event time, out-of-order events can be properly handled. For example, an event with a lower - timestamp may arrive after an event with a higher timestamp, but transformations will handle these events - correctly. Event time processing provides predictable results, but incurs more latency, as out-of-order - events need to be buffered - -- *Ingestion time:* Ingestion time is the time that events enter Flink. In particular, the timestamp of - an event is assigned by the source operator as the current wall clock time of the machine that executes - the source task at the time the records enter the Flink source. Ingestion time is more predictable - than processing time, and gives lower latencies than event time as the latency does not depend on - external systems. Ingestion time provides thus a middle ground between processing time and event time. - Ingestion time is a special case of event time (and indeed, it is treated by Flink identically to - event time). - -When dealing with event time, transformations need to avoid indefinite -wait times for events to arrive. *Watermarks* provide the mechanism to control the event time-processing time skew. Watermarks -are emitted by the sources. A watermark with a certain timestamp denotes the knowledge that no event -with timestamp lower than the timestamp of the watermark will ever arrive. - -You can specify the semantics of time in a Flink DataStream program using `StreamExecutionEnviroment`, as - -
-
-{% highlight java %} -env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); -env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); -env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); -{% endhighlight %} -
- -
-{% highlight java %} -env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) -env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) -env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) -{% endhighlight %} -
-
- -The default value is `TimeCharacteristic.ProcessingTime`, so in order to write a program with processing -time semantics nothing needs to be specified (e.g., the first [example](#example-program) in this guide follows processing -time semantics). - -In order to work with event time semantics, you need to follow four steps: - -- Set `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` - -- Use `DataStream.assignTimestamps(...)` in order to tell Flink how timestamps relate to events (e.g., which - record field is the timestamp) - -- Set `enableTimestamps()`, as well the interval for watermark emission (`setAutoWatermarkInterval(long milliseconds)`) - in `ExecutionConfig`. - -For example, assume that we have a data stream of tuples, in which the first field is the timestamp (assigned -by the system that generates these data streams), and we know that the lag between the current processing -time and the timestamp of an event is never more than 1 second: - -
-
-{% highlight java %} -DataStream> stream = //... -stream.assignTimestamps(new TimestampExtractor>{ - @Override - public long extractTimestamp(Tuple4 element, long currentTimestamp) { - return element.f0; - } - - @Override - public long extractWatermark(Tuple4 element, long currentTimestamp) { - return element.f0 - 1000; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } -}); -{% endhighlight %} -
- -
-{% highlight scala %} -val stream: DataStream[(Long,Int,Double,String)] = null; -stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] { - override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - - override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000 - - override def getCurrentWatermark: Long = Long.MinValue -}) -{% endhighlight %} -
-
- -If you know that timestamps of events are always ascending, i.e., elements arrive in order, you can use -the `AscendingTimestampExtractor`, and the system generates watermarks automatically: - -
-
-{% highlight java %} -DataStream> stream = //... -stream.assignTimestamps(new AscendingTimestampExtractor>{ - @Override - public long extractAscendingTimestamp(Tuple4 element, long currentTimestamp) { - return element.f0; - } -}); -{% endhighlight %} -
- -
-{% highlight scala %} -stream.extractAscendingTimestamp(record => record._1) -{% endhighlight %} -
-
- -In order to write a program with ingestion time semantics, you need to -set `env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)`. You can think of this setting as a -shortcut for writing a `TimestampExtractor` which assignes timestamps to events at the sources -based on the current source wall-clock time. Flink injects this timestamp extractor automatically. - - -### Windows on Keyed Data Streams - -Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group elements *per key*, -i.e., each window will contain elements with the same key value. - -#### Basic Window Constructs - -Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows -for common use cases. See first if your use case can be served by the pre-defined windows below before moving -to defining your own windows. - -
-
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight java %} -keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS)); - {% endhighlight %} -

-
Sliding time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight java %} -keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)); - {% endhighlight %} -

-
Tumbling count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight java %} -keyedStream.countWindow(1000); - {% endhighlight %} -

-
Sliding count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at most 900 elements). - {% highlight java %} -keyedStream.countWindow(1000, 100) - {% endhighlight %} -

-
- -
- -
- -
- - - - - - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Tumbling time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "tumbles". This means that elements are - grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight scala %} -keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS)) - {% endhighlight %} -

-
Sliding time window
KeyedStream → WindowedStream
-

- Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are - grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than - one window (since windows overlap by at most 4 seconds) - The notion of time is specified by the selected TimeCharacteristic (see time). - {% highlight scala %} -keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) - {% endhighlight %} -

-
Tumbling count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "tumbles". This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element belongs to exactly one window. - {% highlight scala %} -keyedStream.countWindow(1000) - {% endhighlight %} -

-
Sliding count window
KeyedStream → WindowedStream
-

- Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are - grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, - and every element can belong to more than one window (as windows overlap by at most 900 elements). - {% highlight scala %} -keyedStream.countWindow(1000, 100) - {% endhighlight %} -

-
- -
-
- -#### Advanced Window Constructs - -The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, -below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, -but the execution of the window function is triggered when 100 elements have been added to the -window, and every time execution is triggered, 10 elements are retained in the window: - -
-
-{% highlight java %} -keyedStream - .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(10)); -{% endhighlight %} -
- -
-{% highlight scala %} -keyedStream - .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) - .trigger(CountTrigger.of(100)) - .evictor(CountEvictor.of(10)) -{% endhighlight %} -
-
- -The general recipe for building a custom window is to specify (1) a `WindowAssigner`, (2) a `Trigger` (optionally), -and (3) an `Evictor` (optionally). - -The `WindowAssigner` defines how incoming elements are assigned to windows. A window is a logical group of elements -that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according -to some notion of time described above within these values are part of the window). - -For example, the `SlidingTimeWindows` -assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that -time starts from 0 and is measured in milliseconds. Then, we have 6 windows -that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming -element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be -assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your -own window types by extending the `WindowAssigner` class. - -
- -
- - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Global window
KeyedStream → WindowedStream
-

- All incoming elements of a given key are assigned to the same window. - The window does not contain a default trigger, hence it will never be triggered - if a trigger is not explicitly specified. -

- {% highlight java %} -stream.window(GlobalWindows.create()); - {% endhighlight %} -
Tumbling time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (1 second below) based on - their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. - The notion of time is picked from the specified TimeCharacteristic (see time). - The window comes with a default trigger. For event/ingestion time, a window is triggered when a - watermark with value higher than its end-value is received, whereas for processing time - when the current processing time exceeds its current end value. -

- {% highlight java %} -stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))); - {% endhighlight %} -
Sliding time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (5 seconds below) based on - their timestamp. Windows "slide" by the provided value (1 second in the example), and hence - overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a - watermark with value higher than its end-value is received, whereas for processing time - when the current processing time exceeds its current end value. -

- {% highlight java %} -stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))); - {% endhighlight %} -
-
- -
- - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Global window
KeyedStream → WindowedStream
-

- All incoming elements of a given key are assigned to the same window. - The window does not contain a default trigger, hence it will never be triggered - if a trigger is not explicitly specified. -

- {% highlight scala %} -stream.window(GlobalWindows.create) - {% endhighlight %} -
Tumbling time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (1 second below) based on - their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. - The notion of time is specified by the selected TimeCharacteristic (see time). - The window comes with a default trigger. For event/ingestion time, a window is triggered when a - watermark with value higher than its end-value is received, whereas for processing time - when the current processing time exceeds its current end value. -

- {% highlight scala %} -stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - {% endhighlight %} -
Sliding time windows
KeyedStream → WindowedStream
-

- Incoming elements are assigned to a window of a certain size (5 seconds below) based on - their timestamp. Windows "slide" by the provided value (1 second in the example), and hence - overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a - watermark with value higher than its end-value is received, whereas for processing time - when the current processing time exceeds its current end value. -

- {% highlight scala %} -stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) - {% endhighlight %} -
-
- -
- -The `Trigger` specifies when the function that comes after the window clause (e.g., `sum`, `count`) is evaluated ("fires") -for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the -definition of the `WindowAssigner`). Flink comes bundled with a set of triggers if the ones that windows use by -default do not fit the application. You can write your own trigger by implementing the `Trigger` interface. Note that -specifying a trigger will override the default trigger of the window assigner. - -
- -
- - - - - - - - - - - - - - - - - - - - - - -
TransformationDescription
Processing time trigger -

- A window is fired when the current processing time exceeds its end-value. - The elements on the triggered window are henceforth discarded. -

-{% highlight java %} -windowedStream.trigger(ProcessingTimeTrigger.create()); -{% endhighlight %} -
Watermark trigger -

- A window is fired when a watermark with value that exceeds the window's end-value has been received. - The elements on the triggered window are henceforth discarded. -

-{% highlight java %} -windowedStream.trigger(EventTimeTrigger.create()); -{% endhighlight %} -
Continuous processing time trigger -

- A window is periodically considered for being fired (every 5 seconds in the example). - The window is actually fired only when the current processing time exceeds its end-value. - The elements on the triggered window are retained. -

-{% highlight java %} -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); -{% endhighlight %} -
Continuous watermark time trigger -

- A window is periodically considered for being fired (every 5 seconds in the example). - A window is actually fired wh