flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [16/51] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
Date Wed, 24 Aug 2016 09:26:36 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md
new file mode 100644
index 0000000..8d152df
--- /dev/null
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -0,0 +1,329 @@
+---
+title: "Generating Timestamps / Watermarks"
+nav-parent_id: event_time
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* toc
+{:toc}
+
+
+This section is relevant for program running on **Event Time**. For an introduction to *Event Time*,
+*Processing Time*, and *Ingestion Time*, please refer to the [event time introduction]({{ site.baseurl }}/dev/event_time.html)
+
+To work with *Event Time*, streaming programs need to set the *time characteristic* accordingly.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
+</div>
+
+## Assigning Timestamps
+
+In order to work with *Event Time*, Flink needs to know the events' *timestamps*, meaning each element in the
+stream needs to get its event timestamp *assigned*. That happens usually by accessing/extracting the
+timestamp from some field in the element.
+
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
+the progress in event time.
+
+There are two ways to assign timestamps and generate Watermarks:
+
+  1. Directly in the data stream source
+  2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted
+
+<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as
+millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
+
+### Source Functions with Timestamps and Watermarks
+
+Stream sources can also directly assign timestamps to the elements they produce and emit Watermarks. In that case,
+no Timestamp Assigner is needed.
+
+To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)`
+method on the `SourceContext`. To generate Watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a source *(non-checkpointed)* that assigns timestamps and generates Watermarks
+depending on special events:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+	while (/* condition */) {
+		MyType next = getNext();
+		ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+		if (next.hasWatermarkTime()) {
+			ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+		}
+	}
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def run(ctx: SourceContext[MyType]): Unit = {
+	while (/* condition */) {
+		val next: MyType = getNext()
+		ctx.collectWithTimestamp(next, next.eventTimestamp)
+
+		if (next.hasWatermarkTime) {
+			ctx.emitWatermark(new Watermark(next.getWatermarkTime))
+		}
+	}
+}
+{% endhighlight %}
+</div>
+</div>
+
+*Note:* If the streaming program uses a TimestampAssigner on a stream where elements have a timestamp already,
+those timestamps will be overwritten by the TimestampAssigner. Similarly, Watermarks will be overwritten as well.
+
+
+### Timestamp Assigners / Watermark Generators
+
+Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
+original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
+
+The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
+A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
+In any case, the timestamp assigner needs to be specified before the first operation on event time
+(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
+Flink allows the specification of a timestamp assigner / watermark emitter inside
+the source (or consumer) itself. More information on how to do so can be found in the
+[Kafka Connector documentation]({{ site.baseurl }}/dev/connectors/kafka.html).
+
+
+**NOTE:** The remainder of this section presents the main interfaces a programmer has
+to implement in order to create her own timestamp extractors/watermark emitters.
+To see the pre-implemented extractors that ship with Flink, please refer to the
+[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/dev/event_timestamp_extractors.html) page.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream<MyEvent> stream = env.readFile(
+        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
+        FilePathFilter.createDefaultFilter(), typeInfo);
+
+DataStream<MyEvent> withTimestampsAndWatermarks = stream
+        .filter( event -> event.severity() == WARNING )
+        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
+
+withTimestampsAndWatermarks
+        .keyBy( (event) -> event.getGroup() )
+        .timeWindow(Time.seconds(10))
+        .reduce( (a, b) -> a.add(b) )
+        .addSink(...);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val stream: DataStream[MyEvent] = env.readFile(
+         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
+         FilePathFilter.createDefaultFilter());
+
+val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
+        .filter( _.severity == WARNING )
+        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
+
+withTimestampsAndWatermarks
+        .keyBy( _.getGroup )
+        .timeWindow(Time.seconds(10))
+        .reduce( (a, b) => a.add(b) )
+        .addSink(...)
+{% endhighlight %}
+</div>
+</div>
+
+
+#### **With Periodic Watermarks**
+
+The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending
+on the stream elements, or purely based on processing time).
+
+The interval (every *n* milliseconds) in which the watermark will be generated is defined via
+`ExecutionConfig.setAutoWatermarkInterval(...)`. Each time, the assigner's `getCurrentWatermark()` method will be
+called, and a new Watermark will be emitted, if the returned Watermark is non-null and larger than the previous
+Watermark.
+
+Two simple examples of timestamp assigners with periodic watermark generation are below.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * This generator generates watermarks assuming that elements come out of order to a certain degree only.
+ * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
+ * elements for timestamp t.
+ */
+public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
+
+    private final long maxOutOfOrderness = 3500; // 3.5 seconds
+
+    private long currentMaxTimestamp;
+
+    @Override
+    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
+        long timestamp = element.getCreationTime();
+        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
+        return timestamp;
+    }
+
+    @Override
+    public Watermark getCurrentWatermark() {
+        // return the watermark as current highest timestamp minus the out-of-orderness bound
+        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    }
+}
+
+/**
+ * This generator generates watermarks that are lagging behind processing time by a certain amount.
+ * It assumes that elements arrive in Flink after at most a certain time.
+ */
+public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
+
+	private final long maxTimeLag = 5000; // 5 seconds
+
+	@Override
+	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
+		return element.getCreationTime();
+	}
+
+	@Override
+	public Watermark getCurrentWatermark() {
+		// return the watermark as current time minus the maximum time lag
+		return new Watermark(System.currentTimeMillis() - maxTimeLag);
+	}
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+ * This generator generates watermarks assuming that elements come out of order to a certain degree only.
+ * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
+ * elements for timestamp t.
+ */
+class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
+
+    val maxOutOfOrderness = 3500L; // 3.5 seconds
+
+    var currentMaxTimestamp: Long;
+
+    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
+        val timestamp = element.getCreationTime()
+        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
+        timestamp;
+    }
+
+    override def getCurrentWatermark(): Watermark = {
+        // return the watermark as current highest timestamp minus the out-of-orderness bound
+        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    }
+}
+
+/**
+ * This generator generates watermarks that are lagging behind processing time by a certain amount.
+ * It assumes that elements arrive in Flink after at most a certain time.
+ */
+class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
+
+    val maxTimeLag = 5000L; // 5 seconds
+
+    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
+        element.getCreationTime
+    }
+
+    override def getCurrentWatermark(): Watermark = {
+        // return the watermark as current time minus the maximum time lag
+        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+#### **With Punctuated Watermarks**
+
+To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use the
+`AssignerWithPunctuatedWatermarks`. For this class, Flink will first call the `extractTimestamp(...)` method
+to assign the element a timestamp, and then immediately call for that element the
+`checkAndGetNextWatermark(...)` method.
+
+The `checkAndGetNextWatermark(...)` method gets the timestamp that was assigned in the `extractTimestamp(...)`
+method, and can decide whether it wants to generate a Watermark. Whenever the `checkAndGetNextWatermark(...)`
+method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that
+new Watermark will be emitted.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
+
+	@Override
+	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
+		return element.getCreationTime();
+	}
+
+	@Override
+	public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
+		return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
+	}
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
+
+	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
+		element.getCreationTime
+	}
+
+	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
+		if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
+	}
+}
+{% endhighlight %}
+</div>
+</div>
+
+*Note:* It is possible to generate a watermark on every single event. However, because each watermark causes some
+computation downstream, an excessive number of watermarks slows down performance.

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/index.md b/docs/dev/index.md
new file mode 100644
index 0000000..67916c1
--- /dev/null
+++ b/docs/dev/index.md
@@ -0,0 +1,25 @@
+---
+title: "Application Development"
+nav-id: dev
+nav-title: '<i class="fa fa-code" aria-hidden="true"></i> Application Development'
+nav-parent_id: root
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/java8.md
----------------------------------------------------------------------
diff --git a/docs/dev/java8.md b/docs/dev/java8.md
new file mode 100644
index 0000000..3792e27
--- /dev/null
+++ b/docs/dev/java8.md
@@ -0,0 +1,196 @@
+---
+title: "Java 8"
+nav-parent_id: apis
+nav-pos: 105
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Java 8 introduces several new language features designed for faster and clearer coding. With the most important feature,
+the so-called "Lambda Expressions", Java 8 opens the door to functional programming. Lambda Expressions allow for implementing and
+passing functions in a straightforward way without having to declare additional (anonymous) classes.
+
+The newest version of Flink supports the usage of Lambda Expressions for all operators of the Java API.
+This document shows how to use Lambda Expressions and describes current limitations. For a general introduction to the
+Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_concepts.html)
+
+* TOC
+{:toc}
+
+### Examples
+
+The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression.
+The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
+
+~~~java
+env.fromElements(1, 2, 3)
+// returns the squared i
+.map(i -> i*i)
+.print();
+~~~
+
+The next two examples show different implementations of a function that uses a `Collector` for output.
+Functions, such as `flatMap()`, require a output type (in this case `String`) to be defined for the `Collector` in order to be type-safe.
+If the `Collector` type can not be inferred from the surrounding context, it need to be declared in the Lambda Expression's parameter list manually.
+Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
+
+~~~java
+DataSet<Integer> input = env.fromElements(1, 2, 3);
+
+// collector type must be declared
+input.flatMap((Integer number, Collector<String> out) -> {
+    StringBuilder builder = new StringBuilder();
+    for(int i = 0; i < number; i++) {
+        builder.append("a");
+        out.collect(builder.toString());
+    }
+})
+// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
+.print();
+~~~
+
+~~~java
+DataSet<Integer> input = env.fromElements(1, 2, 3);
+
+// collector type must not be declared, it is inferred from the type of the dataset
+DataSet<String> manyALetters = input.flatMap((number, out) -> {
+    StringBuilder builder = new StringBuilder();
+    for(int i = 0; i < number; i++) {
+       builder.append("a");
+       out.collect(builder.toString());
+    }
+});
+
+// returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
+manyALetters.print();
+~~~
+
+The following code demonstrates a word count which makes extensive use of Lambda Expressions.
+
+~~~java
+DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
+
+// filter out strings that contain "not"
+input.filter(line -> !line.contains("not"))
+// split each line by space
+.map(line -> line.split(" "))
+// emit a pair <word,1> for each array element
+.flatMap((String[] wordArray, Collector<Tuple2<String, Integer>> out)
+    -> Arrays.stream(wordArray).forEach(t -> out.collect(new Tuple2<>(t, 1)))
+    )
+// group and sum up
+.groupBy(0).sum(1)
+// print
+.print();
+~~~
+
+### Compiler Limitations
+Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above)**.
+
+Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely.
+Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2<String,Integer` or `Collector<String>` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink Compiler.
+
+How to compile a Flink job that contains Lambda Expressions with the JDT compiler will be covered in the next section.
+
+However, it is possible to implement functions such as `map()` or `filter()` with Lambda Expressions in Java 8 compilers other than the Eclipse JDT compiler as long as the function has no `Collector`s or `Iterable`s *and* only if the function handles unparameterized types such as `Integer`, `Long`, `String`, `MyOwnClass` (types without Generics!).
+
+#### Compile Flink jobs with the Eclipse JDT compiler and Maven
+
+If you are using the Eclipse IDE, you can run and debug your Flink code within the IDE without any problems after some configuration steps. The Eclipse IDE by default compiles its Java sources with the Eclipse JDT compiler. The next section describes how to configure the Eclipse IDE.
+
+If you are using a different IDE such as IntelliJ IDEA or you want to package your Jar-File with Maven to run your job on a cluster, you need to modify your project's `pom.xml` file and build your program with Maven. The [quickstart]({{site.baseurl}}/quickstart/setup_quickstart.html) contains preconfigured Maven projects which can be used for new projects or as a reference. Uncomment the mentioned lines in your generated quickstart `pom.xml` file if you want to use Java 8 with Lambda Expressions.
+
+Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
+
+~~~xml
+<!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
+
+<plugin>
+    <!-- Use compiler plugin with tycho as the adapter to the JDT compiler. -->
+    <artifactId>maven-compiler-plugin</artifactId>
+    <configuration>
+        <source>1.8</source>
+        <target>1.8</target>
+        <compilerId>jdt</compilerId>
+    </configuration>
+    <dependencies>
+        <!-- This dependency provides the implementation of compiler "jdt": -->
+        <dependency>
+            <groupId>org.eclipse.tycho</groupId>
+            <artifactId>tycho-compiler-jdt</artifactId>
+            <version>0.21.0</version>
+        </dependency>
+    </dependencies>
+</plugin>
+~~~
+
+If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
+
+~~~xml
+<!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
+
+<pluginExecution>
+    <pluginExecutionFilter>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <versionRange>[3.1,)</versionRange>
+        <goals>
+            <goal>testCompile</goal>
+            <goal>compile</goal>
+        </goals>
+    </pluginExecutionFilter>
+    <action>
+        <ignore></ignore>
+    </action>
+</pluginExecution>
+~~~
+
+#### Run and debug Flink jobs within the Eclipse IDE
+
+First of all, make sure you are running a current version of Eclipse IDE (4.4.2 or later). Also make sure that you have a Java 8 Runtime Environment (JRE) installed in Eclipse IDE (`Window` -> `Preferences` -> `Java` -> `Installed JREs`).
+
+Create/Import your Eclipse project.
+
+If you are using Maven, you also need to change the Java version in your `pom.xml` for the `maven-compiler-plugin`. Otherwise right click the `JRE System Library` section of your project and open the `Properties` window in order to switch to a Java 8 JRE (or above) that supports Lambda Expressions.
+
+The Eclipse JDT compiler needs a special compiler flag in order to store type information in `.class` files. Open the JDT configuration file at `{project directoy}/.settings/org.eclipse.jdt.core.prefs` with your favorite text editor and add the following line:
+
+~~~
+org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate
+~~~
+
+If not already done, also modify the Java versions of the following properties to `1.8` (or above):
+
+~~~
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.compliance=1.8
+org.eclipse.jdt.core.compiler.source=1.8
+~~~
+
+After you have saved the file, perform a complete project refresh in Eclipse IDE.
+
+If you are using Maven, right click your Eclipse project and select `Maven` -> `Update Project...`.
+
+You have configured everything correctly, if the following Flink program runs without exceptions:
+
+~~~java
+final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.fromElements(1, 2, 3).map((in) -> new Tuple1<String>(" " + in)).print();
+env.execute();
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libraries.md
----------------------------------------------------------------------
diff --git a/docs/dev/libraries.md b/docs/dev/libraries.md
new file mode 100644
index 0000000..dc22e97
--- /dev/null
+++ b/docs/dev/libraries.md
@@ -0,0 +1,24 @@
+---
+title: "Libraries"
+nav-id: libs
+nav-parent_id: dev
+nav-pos: 8
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
new file mode 100644
index 0000000..77266bc
--- /dev/null
+++ b/docs/dev/libs/cep.md
@@ -0,0 +1,652 @@
+---
+title: "FlinkCEP - Complex event processing for Flink"
+nav-title: Event Processing (CEP)
+nav-parent_id: libs
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+FlinkCEP is the complex event processing library for Flink.
+It allows you to easily detect complex event patterns in a stream of endless data.
+Complex events can then be constructed from matching sequences.
+This gives you the opportunity to quickly get hold of what's really important in your data.
+
+<span class="label label-danger">Attention</span> The events in the `DataStream` to which
+you want to apply pattern matching have to implement proper `equals()` and `hashCode()` methods
+because these are used for comparing and matching events.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Getting Started
+
+If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/dev/api_concepts.html#linking-with-flink).
+Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-cep{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-cep-scala{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
+
+Note that FlinkCEP is currently not part of the binary distribution.
+See linking with it for cluster execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+Now you can start writing your first CEP program using the pattern API.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Event> input = ...
+
+Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
+    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
+    .followedBy("end").where(evt -> evt.getName().equals("end"));
+
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+
+DataStream<Alert> result = patternStream.select(pattern -> {
+    return createAlertFrom(pattern);
+});
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Event] = ...
+
+val pattern = Pattern.begin("start").where(_.getId == 42)
+  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
+  .followedBy("end").where(_.getName == "end")
+
+val patternStream = CEP.pattern(input, pattern)
+
+val result: DataStream[Alert] = patternStream.select(createAlert(_))
+{% endhighlight %}
+</div>
+</div>
+
+Note that we use use Java 8 lambdas in our Java code examples to make them more succinct.
+
+## The Pattern API
+
+The pattern API allows you to quickly define complex event patterns.
+
+Each pattern consists of multiple stages or what we call states.
+In order to go from one state to the next, the user can specify conditions.
+These conditions can be the contiguity of events or a filter condition on an event.
+
+Each pattern has to start with an initial state:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> start = Pattern.<Event>begin("start");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val start : Pattern[Event, _] = Pattern.begin("start")
+{% endhighlight %}
+</div>
+</div>
+
+Each state must have an unique name to identify the matched events later on.
+Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+start.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+start.where(event => ... /* some condition */)
+{% endhighlight %}
+</div>
+</div>
+
+We can also restrict the type of the accepted event to some subtype of the initial event type (here `Event`) via the `subtype` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+    @Override
+    public boolean filter(SubEvent value) {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
+{% endhighlight %}
+</div>
+</div>
+
+As it can be seen here, the subtype condition can also be combined with an additional filter condition on the subtype.
+In fact you can always provide multiple conditions by calling `where` and `subtype` multiple times.
+These conditions will then be combined using the logical AND operator.
+
+In order to construct or conditions, one has to call the `or` method with a respective filter function.
+Any existing filter function is then ORed with the given one.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+pattern.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // some condition
+    }
+}).or(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // or condition
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
+{% endhighlight %}
+</div>
+</div>
+
+Next, we can append further states to detect complex patterns.
+We can control the contiguity of two succeeding events to be accepted by the pattern.
+
+Strict contiguity means that two matching events have to succeed directly.
+This means that no other events can occur in between.
+A strict contiguity pattern state can be created via the `next` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> strictNext = start.next("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val strictNext: Pattern[Event, _] = start.next("middle")
+{% endhighlight %}
+</div>
+</div>
+
+Non-strict contiguity means that other events are allowed to occur in-between two matching events.
+A non-strict contiguity pattern state can be created via the `followedBy` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
+{% endhighlight %}
+</div>
+</div>
+It is also possible to define a temporal constraint for the pattern to be valid.
+For example, one can define that a pattern should occur within 10 seconds via the `within` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+next.within(Time.seconds(10));
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+next.within(Time.seconds(10))
+{% endhighlight %}
+</div>
+</div>
+
+<br />
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><strong>Begin</strong></td>
+            <td>
+            <p>Defines a starting pattern state:</p>
+{% highlight java %}
+Pattern<Event, ?> start = Pattern.<Event>begin("start");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Next</strong></td>
+            <td>
+                <p>Appends a new pattern state. A matching event has to directly succeed the previous matching event:</p>
+{% highlight java %}
+Pattern<Event, ?> next = start.next("next");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>FollowedBy</strong></td>
+            <td>
+                <p>Appends a new pattern state. Other events can occur between a matching event and the previous matching event:</p>
+{% highlight java %}
+Pattern<Event, ?> followedBy = start.followedBy("next");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Where</strong></td>
+            <td>
+                <p>Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:</p>
+{% highlight java %}
+patternState.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Or</strong></td>
+            <td>
+                <p>Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:</p>
+{% highlight java %}
+patternState.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // some condition
+    }
+}).or(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // alternative condition
+    }
+});
+{% endhighlight %}
+                    </td>
+                </tr>
+       <tr>
+           <td><strong>Subtype</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:</p>
+{% highlight java %}
+patternState.subtype(SubEvent.class);
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>Within</strong></td>
+          <td>
+              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:</p>
+{% highlight java %}
+patternState.within(Time.seconds(10));
+{% endhighlight %}
+          </td>
+      </tr>
+  </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><strong>Begin</strong></td>
+            <td>
+            <p>Defines a starting pattern state:</p>
+{% highlight scala %}
+val start = Pattern.begin[Event]("start")
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Next</strong></td>
+            <td>
+                <p>Appends a new pattern state. A matching event has to directly succeed the previous matching event:</p>
+{% highlight scala %}
+val next = start.next("middle")
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>FollowedBy</strong></td>
+            <td>
+                <p>Appends a new pattern state. Other events can occur between a matching event and the previous matching event:</p>
+{% highlight scala %}
+val followedBy = start.followedBy("middle")
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Where</strong></td>
+            <td>
+                <p>Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:</p>
+{% highlight scala %}
+patternState.where(event => ... /* some condition */)
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Or</strong></td>
+            <td>
+                <p>Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:</p>
+{% highlight scala %}
+patternState.where(event => ... /* some condition */)
+    .or(event => ... /* alternative condition */)
+{% endhighlight %}
+                    </td>
+                </tr>
+       <tr>
+           <td><strong>Subtype</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:</p>
+{% highlight scala %}
+patternState.subtype(classOf[SubEvent])
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>Within</strong></td>
+          <td>
+              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:</p>
+{% highlight scala %}
+patternState.within(Time.seconds(10))
+{% endhighlight %}
+          </td>
+      </tr>
+  </tbody>
+</table>
+</div>
+
+</div>
+
+### Detecting Patterns
+
+In order to run a stream of events against your pattern, you have to create a `PatternStream`.
+Given an input stream `input` and a pattern `pattern`, you create the `PatternStream` by calling
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Event> input = ...
+Pattern<Event, ?> pattern = ...
+
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input : DataStream[Event] = ...
+val pattern : Pattern[Event, _] = ...
+
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+{% endhighlight %}
+</div>
+</div>
+
+### Selecting from Patterns
+Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+The `select` method requires a `PatternSelectFunction` implementation.
+A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
+It receives a map of string/event pairs of the matched events.
+The string is defined by the name of the state to which the event has been matched.
+The `select` method can return exactly one result.
+
+{% highlight java %}
+class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
+    @Override
+    public OUT select(Map<String, IN> pattern) {
+        IN startEvent = pattern.get("start");
+        IN endEvent = pattern.get("end");
+        return new OUT(startEvent, endEvent);
+    }
+}
+{% endhighlight %}
+
+A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an arbitrary number of results.
+In order to do this, the `select` method has an additional `Collector` parameter which is used for the element output.
+
+{% highlight java %}
+class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
+    @Override
+    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
+        IN startEvent = pattern.get("start");
+        IN endEvent = pattern.get("end");
+
+        for (int i = 0; i < startEvent.getValue(); i++ ) {
+            collector.collect(new OUT(startEvent, endEvent));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+The `select` method takes a selection function as argument, which is called for each matching event sequence.
+It receives a map of string/event pairs of the matched events.
+The string is defined by the name of the state to which the event has been matched.
+The selection function returns exactly one result per call.
+
+{% highlight scala %}
+def selectFn(pattern : mutable.Map[String, IN]): OUT = {
+    val startEvent = pattern.get("start").get
+    val endEvent = pattern.get("end").get
+    OUT(startEvent, endEvent)
+}
+{% endhighlight %}
+
+The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the `flatSelect` method can return an arbitrary number of results per call.
+In order to do this, the function for `flatSelect` has an additional `Collector` parameter which is used for the element output.
+
+{% highlight scala %}
+def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {
+    val startEvent = pattern.get("start").get
+    val endEvent = pattern.get("end").get
+    for (i <- 0 to startEvent.getValue) {
+        collector.collect(OUT(startEvent, endEvent))
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### Handling Timed Out Partial Patterns
+
+Whenever a pattern has a window length associated via the `within` key word, it is possible that partial event patterns will be discarded because they exceed the window length.
+In order to react to these timeout events the `select` and `flatSelect` API calls allow to specify a timeout handler.
+This timeout handler is called for each partial event pattern which has timed out.
+The timeout handler receives all so far matched events of the partial pattern and the timestamp when the timeout was detected.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The return type of the timeout function can be different from the select function.
+The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively so that the resulting data stream is of type `org.apache.flink.types.Either`.
+
+{% highlight java %}
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
+    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternSelectFunction<Event, ComplexEvent>() {...}
+);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+In order to treat partial patterns, the `select` API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
+The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred.
+The string is defined by the name of the state to which the event has been matched.
+The timeout function returns exactly one result per call.
+The return type of the timeout function can be different from the select function.
+The timeout event and the select event are wrapped in `Left` and `Right` respectively so that the resulting data stream is of type `Either`.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
+    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
+} {
+    pattern: mutable.Map[String, Event] => ComplexEvent()
+}
+{% endhighlight %}
+
+The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
+In contrast to the `select` functions, the `flatSelect` functions are called with an `Collector`.
+The collector can be used to emit an arbitrary number of events.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
+    (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>
+        out.collect(TimeoutEvent())
+} {
+    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) =>
+        out.collect(ComplexEvent())
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+## Examples
+
+The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.
+The events are keyed by their ids and a valid pattern has to occur within 10 seconds.
+The whole processing is done with event time.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = ...
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream<Event> input = ...
+
+DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
+	@Override
+	public Integer getKey(Event value) throws Exception {
+		return value.getId();
+	}
+});
+
+Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
+	.next("middle").where(new FilterFunction<Event>() {
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("error");
+		}
+	}).followedBy("end").where(new FilterFunction<Event>() {
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("critical");
+		}
+	}).within(Time.seconds(10));
+
+PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
+
+DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
+	@Override
+	public Alert select(Map<String, Event> pattern) throws Exception {
+		return createAlert(pattern);
+	}
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env : StreamExecutionEnvironment = ...
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val input : DataStream[Event] = ...
+
+val partitionedInput = input.keyBy(event => event.getId)
+
+val pattern = Pattern.begin("start")
+  .next("middle").where(_.getName == "error")
+  .followedBy("end").where(_.getName == "critical")
+  .within(Time.seconds(10))
+
+val patternStream = CEP.pattern(partitionedInput, pattern)
+
+val alerts = patternStream.select(createAlert(_)))
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/dev/libs/gelly/graph_algorithms.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/graph_algorithms.md b/docs/dev/libs/gelly/graph_algorithms.md
new file mode 100644
index 0000000..09f6abc
--- /dev/null
+++ b/docs/dev/libs/gelly/graph_algorithms.md
@@ -0,0 +1,308 @@
+---
+title: Graph Algorithms
+nav-parent_id: graphs
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+The logic blocks with which the `Graph` API and top-level algorithms are assembled are accessible in Gelly as graph
+algorithms in the `org.apache.flink.graph.asm` package. These algorithms provide optimization and tuning through
+configuration parameters and may provide implicit runtime reuse when processing the same input with a similar
+configuration.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Algorithm</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>VertexInDegree</strong></td>
+      <td>
+        <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with the in-degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, LongValue>> inDegree = graph
+  .run(new VertexInDegree()
+    .setIncludeZeroDegreeVertices(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default only the edge set is processed for the computation of degree; when this flag is set an additional join is performed against the vertex set in order to output vertices with an in-degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>VertexOutDegree</strong></td>
+      <td>
+        <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with the out-degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, LongValue>> outDegree = graph
+  .run(new VertexOutDegree()
+    .setIncludeZeroDegreeVertices(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default only the edge set is processed for the computation of degree; when this flag is set an additional join is performed against the vertex set in order to output vertices with an out-degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>VertexDegrees</strong></td>
+      <td>
+        <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph
+  .run(new VertexDegrees()
+    .setIncludeZeroDegreeVertices(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default only the edge set is processed for the computation of degree; when this flag is set an additional join is performed against the vertex set in order to output vertices with out- and in-degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeSourceDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph
+  .run(new EdgeSourceDegrees());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeTargetDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph
+  .run(new EdgeTargetDegrees();
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeDegreesPair</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of both the source and target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph
+  .run(new EdgeDegreesPair());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.undirected.<br/><strong>VertexDegree</strong></td>
+      <td>
+        <p>Annotate vertices of an <a href="#graph-representation">undirected graph</a> with the degree.</p>
+{% highlight java %}
+DataSet<Vertex<K, LongValue>> degree = graph
+  .run(new VertexDegree()
+    .setIncludeZeroDegreeVertices(true)
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setIncludeZeroDegreeVertices</strong>: by default only the edge set is processed for the computation of degree; when this flag is set an additional join is performed against the vertex set in order to output vertices with a degree of zero</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.undirected.<br/><strong>EdgeSourceDegree</strong></td>
+      <td>
+        <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, LongValue>>> sourceDegree = graph
+  .run(new EdgeSourceDegree()
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.undirected.<br/><strong>EdgeTargetDegree</strong></td>
+      <td>
+        <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph
+  .run(new EdgeTargetDegree()
+    .setReduceOnSourceId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+          <li><p><strong>setReduceOnSourceId</strong>: the degree can be counted from either the edge source or target IDs. By default the target IDs are counted. Reducing on source IDs may optimize the algorithm if the input edge list is sorted by source ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.undirected.<br/><strong>EdgeDegreePair</strong></td>
+      <td>
+        <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
+  .run(new EdgeDegreePair()
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.filter.undirected.<br/><strong>MaximumDegree</strong></td>
+      <td>
+        <p>Filter an <a href="#graph-representation">undirected graph</a> by maximum degree.</p>
+{% highlight java %}
+Graph<K, VV, EV> filteredGraph = graph
+  .run(new MaximumDegree(5000)
+    .setBroadcastHighDegreeVertices(true)
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setBroadcastHighDegreeVertices</strong>: join high-degree vertices using a broadcast-hash to reduce data shuffling when removing a relatively small number of high-degree vertices.</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>simple.directed.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Remove self-loops and duplicate edges from a <a href="#graph-representation">directed graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>simple.undirected.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Add symmetric edges and remove self-loops and duplicate edges from an <a href="#graph-representation">undirected graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>translate.<br/><strong>TranslateGraphIds</strong></td>
+      <td>
+        <p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateGraphIds(new LongValueToStringValue()));
+{% endhighlight %}
+        <p>Required configuration:</p>
+        <ul>
+          <li><p><strong>translator</strong>: implements type or value conversion</p></li>
+        </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>translate.<br/><strong>TranslateVertexValues</strong></td>
+      <td>
+        <p>Translate vertex values using the given <code>TranslateFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
+{% endhighlight %}
+        <p>Required configuration:</p>
+        <ul>
+          <li><p><strong>translator</strong>: implements type or value conversion</p></li>
+        </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>translate.<br/><strong>TranslateEdgeValues</strong></td>
+      <td>
+        <p>Translate edge values using the given <code>TranslateFunction</code>.</p>
+{% highlight java %}
+graph.run(new TranslateEdgeValues(new Nullify()));
+{% endhighlight %}
+        <p>Required configuration:</p>
+        <ul>
+          <li><p><strong>translator</strong>: implements type or value conversion</p></li>
+        </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+{% top %}


Mime
View raw message