flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [05/22] git commit: [Docs] Adjust Java API documentation to be in sync with latest changes
Date Fri, 01 Aug 2014 07:29:21 GMT
[Docs] Adjust Java API documentation to be in sync with latest changes

[ci skip]


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d1c49fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d1c49fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d1c49fb

Branch: refs/heads/release-0.6
Commit: 9d1c49fbb497cd10258b0135b17fd68ca2d92ce0
Parents: bcf85c2
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Jul 31 17:25:21 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jul 31 17:31:29 2014 +0200

----------------------------------------------------------------------
 docs/_config.yml                 |   4 +-
 docs/java_api_guide.md           | 902 +++++++++-------------------------
 docs/java_api_transformations.md | 628 +++++++++++++++++++++++
 3 files changed, 872 insertions(+), 662 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d1c49fb/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index e418aab..3f2e410 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -5,7 +5,7 @@
 #     {{ site.CONFIG_KEY }}
 #------------------------------------------------------------------------------
 
-FLINK_VERSION_STABLE: 0.6-SNAPSHOT # this variable can point to a SNAPSHOT version in the git source.
+FLINK_VERSION_STABLE: 0.6-incubating-SNAPSHOT # this variable can point to a SNAPSHOT version in the git source.
 FLINK_VERSION_SHORT: 0.6
 FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
 FLINK_GITHUB_URL:  https://github.com/apache/incubator-flink
@@ -32,4 +32,4 @@ redcarpet:
   # https://help.github.com/articles/github-flavored-markdown
   extensions: ["no_intra_emphasis", "fenced_code_blocks", "autolink",
                "tables", "with_toc_data", "strikethrough", "superscript",
-               "lax_spacing"]
\ No newline at end of file
+               "lax_spacing"]

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d1c49fb/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index 51b5459..f22b2b7 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -1,11 +1,8 @@
 ---
 title: "Java API Programming Guide"
 ---
-<section id="top"></section>
-Java API
-========
 
-<section id="introduction">
+<section id="top">
 Introduction
 ------------
 
@@ -48,7 +45,7 @@ public class WordCountExample {
         env.execute("Word Count Example");
     }
 
-    public static final class LineSplitter extends FlatMapFunction<String, Tuple2<String, Integer>> {
+    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
         @Override
         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
             for (String word : line.split(" ")) {
@@ -67,7 +64,7 @@ Linking with Flink
 
 To write programs with Flink, you need to include Flinkā€™s Java API library in your project.
 
-The simplest way to do this is to use the [quickstart scripts](java_api_quickstart.html). They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:
+The simplest way to do this is to use the [quickstart scripts]({{site.baseurl}}/java_api_quickstart.html). They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:
 
 ```bash
 mvn archetype:generate /
@@ -91,10 +88,13 @@ If you want to add Flink to an existing Maven project, add the following entry t
 </dependency>
 ```
 
-In order to link against the latest SNAPSHOT versions of the code, please follow [this guide]({{site.baseurl}}/downloads.html/#nightly).
+If you are using Flink together with Hadoop, the version of the dependency may vary depending on the version of Hadoop (or more specifically, HDFS) that you want to use Flink with.
+Please refer to the [downloads page]({{site.baseurl}}/downloads.html) for a list of available versions, and instructions on how to link with custom versions of Hadoop.
+
+In order to link against the latest SNAPSHOT versions of the code, please follow [this guide]({{site.baseurl}}/downloads.html#nightly).
 
 The *flink-clients* dependency is only necessary to invoke the Flink program locally (for example to run it standalone for testing and debugging). 
-If you intend to only export the program as a JAR file and [run it on a cluster](cluster_execution.html), you can skip that dependency.
+If you intend to only export the program as a JAR file and [run it on a cluster]({{site.baseurl}}/cluster_execution.html), you can skip that dependency.
 
 [Back to top](#top)
 
@@ -112,7 +112,7 @@ programs with a `main()` method. Each program consists of the same basic parts:
 5. Execute your program.
 
 We will now give an overview of each of those steps but please refer
-to the respective sections for more details. Note that all {% gh_link /flink-java/src/main/java/org/apache/flink/api/java "core classes of the Java API" %} are found in the package `org.apache.flinkapi.java`.
+to the respective sections for more details. Note that all {% gh_link /flink-java/src/main/java/org/apache/flink/api/java "core classes of the Java API" %} are found in the package `org.apache.flink.api.java`.
 
 The `ExecutionEnvironment` is the basis for all Flink programs. You can
 obtain one using these static methods on class `ExecutionEnvironment`:
@@ -131,8 +131,8 @@ Typically, you only need to use `getExecutionEnvironment()`, since this
 will do the right thing depending on the context: if you are executing
 your program inside an IDE or as a regular Java program it will create
 a local environment that will execute your program on your local machine. If
-you created a JAR file from you program, and invoke it through the [command line](cli.html)
-or the [web interface](web_client.html),
+you created a JAR file from you program, and invoke it through the [command line]({{site.baseurl}}/cli.html)
+or the [web interface]({{site.baseurl}}/web_client.html),
 the Flink cluster manager will
 execute your main method and `getExecutionEnvironment()` will return
 an execution environment for executing your program on a cluster.
@@ -202,6 +202,7 @@ machine or submit your program for execution on a cluster, depending on
 how you created the execution environment.
 [Back to top](#top)
 
+
 <section id="lazyeval">
 Lazy Evaluation
 ---------------
@@ -209,6 +210,234 @@ Lazy Evaluation
 All Flink programs are executed lazily: When the program's main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the program's plan. The operations are actually executed when one of the `execute()` methods is invoked on the ExecutionEnvironment object. Whether the program is executed locally or on a cluster depends on the environment of the program.
 
 The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
+[Back to top](#top)
+
+<section id="transformations">
+Transformations
+---------------
+
+Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into
+sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The [transformations documentation]({{site.baseurl}}/java_api_transformations.html)
+has full description of all transformations with examples.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-center" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight java %}
+data.map(new MapFunction<String, Integer>() {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements.</p>
+{% highlight java %}
+data.flatMap(new FlatMapFunction<String, String>() {
+  public void flatMap(String value, Collector<String> out) {
+    for (String s : value.split(" ")) {
+      out.collect(s);
+    }
+  }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for which the function returns *true*.</p>
+{% highlight java %}
+data.filter(new FilterFunction<Integer>() {
+  public boolean filter(Integer value) { return value > 1000; }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly combining two elements into one.
+           Reduce may be applied on a full data set, or on a grouped data set.</p>
+{% highlight java %}
+data.reduce(new ReduceFunction<Integer> {
+  public Integer reduce(Integer a, Integer b) { return a + b; }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.</p>
+{% highlight java %}
+data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
+  public void reduceGroup(Iterable<Integer> values, Collector<Integer> out) {
+    int prefixSum = 0;
+    for (Integer i : values) {
+      prefixSum += i;
+      out.collect(prefixSum);
+    }
+  }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
+{% highlight java %}
+DataSet<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.</p>
+{% highlight java %}
+data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
+  public void reduceGroup(Iterable<Integer> values, Collector<Integer> out) {
+    int prefixSum = 0;
+    for (Integer i : values) {
+      prefixSum += i;
+      out.collect(prefixSum);
+    }
+  }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
+{% highlight java %}
+DataSet<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    </tr>
+      <td><strong>Join</strong></td>
+      <td>
+        <p>Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element. See [keys](#keys) on how to define join keys.</p>
+{% highlight java %}
+result = input1.join(input2)
+               .where(0)       // key of the first input (tuple field 0)
+               .equalTo(1);    // key of the second input (tuple field 1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each input on one or more fields and then joins the groups. The transformation function is called per pair of groups. See [keys](#keys) on how to define coGroup keys.</p>
+{% highlight java %}
+data1.coGroup(data2)
+     .where(0)
+     .equalTo(1)
+     .with(new CoGroupFunction<String, String, String>() {
+         public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) {
+           out.collect(...);
+         }
+      });
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element</p>
+{% highlight java %}
+DataSet<Integer> data1 = // [...]
+DataSet<String> data2 = // [...]
+DataSet<Tuple2<Integer, String>> result = data1.cross(data2);
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets. This operation happens implicitly if more than one data set is used for a specific function input.</p>
+{% highlight java %}
+DataSet<String> data1 = // [...]
+DataSet<String> data2 = // [...]
+DataSet<String> result = data1.union(data2);
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+----------
+
+The following transformations are available on data sets of Tuples:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-center" stype="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong></td>
+      <td>
+        <p>Selects a subset of fields from the tuples</p>
+{% highlight java %}
+DataSet<Tuple3<Integer, Double, String>> in = // [...]
+DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+[Back to top](#top)
+
+
+<section id="keys">
+Defining Keys
+-------------
+
+
+[Back to top](#top)
+
+
+<section id="functions">
+Functions
+---------
+
+
+[Back to top](#top)
+
 
 <section id="types">
 Data Types
@@ -248,7 +477,6 @@ public class WordWithCount {
 You can use all of those types to parameterize `DataSet` and function implementations, e.g. `DataSet<String>` for a `String` data set or `MapFunction<String, Integer>` for a mapper from `String` to `Integer`.
 
 ```java
-
 // using a basic data type
 DataSet<String> numbers = env.fromElements("1", "2");
 
@@ -341,652 +569,6 @@ The {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/typeutils/Res
 [Back to top](#top)
 
 
-<section id="transformations">
-Data Transformations
---------------------
-
-A data transformation transforms one or more `DataSet`s into a new `DataSet`. Advanced data analysis programs can be assembled by chaining multiple transformations.
-
-### Map
-
-The Map transformation applies a user-defined `MapFunction` on each element of a DataSet.
-It implements a one-to-one mapping, that is, exactly one element must be returned by
-the function.
-
-The following code transforms a `DataSet` of Integer pairs into a `DataSet` of Integers:
-
-```java
-// MapFunction that adds two integer values
-public class IntAdder extends MapFunction<Tuple2<Integer, Integer>, Integer> {
-  @Override
-  public Integer map(Tuple2<Integer, Integer> in) {
-    return in.f0 + in.f1;
-  }
-}
-
-// [...]
-DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
-DataSet<Integer> intSums = intPairs.map(new IntAdder());
-```
-
-### FlatMap
-
-The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataSet`.
-This variant of a map function can return arbitrary many result elements (including none) for each input element.
-
-The following code transforms a `DataSet` of text lines into a `DataSet` of words:
-
-```java
-// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
-public class Tokenizer extends FlatMapFunction<String, String> {
-  @Override
-  public void flatMap(String value, Collector<String> out) {
-    for (String token : value.split("\\W")) {
-      out.collect(token);
-    }
-  }
-}
-
-// [...]
-DataSet<String> textLines = // [...]
-DataSet<String> words = textLines.flatMap(new Tokenizer());
-
-```
-
-### Filter
-
-The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataSet` and retains only those elements for which the function returns `true`.
-
-The following code removes all Integers smaller than zero from a `DataSet`:
-
-```java
-// FilterFunction that filters out all Integers smaller than zero.
-public class NaturalNumberFilter extends FilterFunction<Integer> {
-  @Override
-  public boolean filter(Integer number) {
-    return number >= 0;
-  }
-}
-
-// [...]
-DataSet<Integer> intNumbers = // [...]
-DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
-```
-
-### Project (Tuple DataSets only)
-
-The Project transformation removes or moves `Tuple` fields of a `Tuple` `DataSet`.
-The `project(int...)` method selects `Tuple` fields that should be retained by their index and defines their order in the output `Tuple`.
-The `types(Class<?> ...)`method must give the types of the output `Tuple` fields.
-
-Projections do not require the definition of a user function.
-
-The following code shows different ways to apply a Project transformation on a `DataSet`:
-
-```java
-DataSet<Tuple3<Integer, Double, String>> in = // [...]
-// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
-DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
-```
-
-### Transformations on grouped DataSet
-
-The reduce operations can operate on grouped data sets. Specifying the key to
-be used for grouping can be done in two ways:
-
-- a `KeySelector` function or
-- one or more field position keys (`Tuple` `DataSet` only).
-
-Please look at the reduce examples to see how the grouping keys are specified.
-
-### Reduce on grouped DataSet
-
-A Reduce transformation that is applied on a grouped `DataSet` reduces each group to a single element using a user-defined `ReduceFunction`.
-For each group of input elements, a `ReduceFunction` successively combines pairs of elements into one element until only a single element for each group remains.
-
-#### Reduce on DataSet grouped by KeySelector Function
-
-A `KeySelector` function extracts a key value from each element of a `DataSet`. The extracted key value is used to group the `DataSet`.
-The following code shows how to group a POJO `DataSet` using a `KeySelector` function and to reduce it with a `ReduceFunction`.
-
-```java
-// some ordinary POJO
-public class WC {
-  public String word;
-  public int count;
-  // [...]
-}
-
-// ReduceFunction that sums Integer attributes of a POJO
-public class WordCounter extends ReduceFunction<WC> {
-  @Override
-  public WC reduce(WC in1, WC in2) {
-    return new WC(in1.word, in1.count + in2.count);
-  }
-}
-
-// [...]
-DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words
-                         // DataSet grouping with inline-defined KeySelector function
-                         .groupBy(
-                           new KeySelector<WC, String>() {
-                             public String getKey(WC wc) { return wc.word; }
-                           })
-                         // apply ReduceFunction on grouped DataSet
-                         .reduce(new WordCounter());
-```
-
-#### Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
-
-Field position keys specify one or more fields of a `Tuple` `DataSet` that are used as grouping keys.
-The following code shows how to use field position keys and apply a `ReduceFunction`.
-
-```java
-DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
-DataSet<Tuple3<String, Integer, Double>> reducedTuples =
-                                         tuples
-                                         // group DataSet on first and second field of Tuple
-                                         .groupBy(0,1)
-                                         // apply ReduceFunction on grouped DataSet
-                                         .reduce(new MyTupleReducer());
-```
-
-### GroupReduce on grouped DataSet
-
-A GroupReduce transformation that is applied on a grouped `DataSet` calls a user-defined `GroupReduceFunction` for each group. The difference
-between this and `Reduce` is that the user defined function gets the whole group at once.
-The function is invoked with an iterator over all elements of a group and can return an arbitrary number of result elements using the collector.
-
-#### GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
-
-The following code shows how duplicate strings can be removed from a `DataSet` grouped by Integer.
-
-```java
-public class DistinctReduce
-         extends GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
-  // Set to hold all unique strings of a group
-  Set<String> uniqStrings = new HashSet<String>();
-
-  @Override
-  public void reduce(Iterator<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
-    // clear set
-    uniqStrings.clear();
-    // there is at least one element in the iterator
-    Tuple2<Integer, String> first = in.next();
-    Integer key = first.f0;
-    uniqStrings.add(first.f1);
-    // add all strings of the group to the set
-    while(in.hasNext()) {
-      uniqStrings.add(in.next().f1);
-    }
-    // emit all unique strings
-    Tuple2<Integer, String> t = new Tuple2<Integer, String>(key, "");
-    for(String s : uniqStrings) {
-      t.f1 = s;
-      out.collect(t);
-    }
-  }
-}
-
-// [...]
-DataSet<Tuple2<Integer, String>> input = // [...]
-DataSet<Tuple2<Integer, String>> output =
-                                 input
-                                 // group DataSet by the first tuple field
-                                 .groupBy(0)
-                                 // apply GroupReduceFunction on each group and
-                                 //   remove elements with duplicate strings.
-                                 .reduceGroup(new DistinctReduce());
-```
-
-**Note:** Flink internally works a lot with mutable objects. Collecting objects like in the above example only works because Strings are immutable in Java!
-
-#### GroupReduce on DataSet grouped by KeySelector Function
-
-Works analogous to `KeySelector` functions in Reduce transformations.
-
-#### GroupReduce on sorted groups (Tuple DataSets only)
-
-A `GroupReduceFunction` accesses the elements of a group using an iterator. Optionally, the iterator can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined `GroupReduceFunction` and improve its efficiency.
-Right now, this feature is only available for `Tuple` `DataSet`.
-
-The following code shows another example how to remove duplicate Strings in a `DataSet` grouped by an Integer and sorted by String.
-
-```java
-// GroupReduceFunction that removes consecutive identical elements
-public class DistinctReduce
-         extends GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
-  @Override
-  public void reduce(Iterator<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
-    // there is at least one element in the iterator
-    Tuple2<Integer, String> first = in.next();
-    Integer key = first.f0;
-    String comp = first.f1;
-    // for each element in group
-    while(in.hasNext()) {
-      String next = in.next().f1;
-      // check if strings are different
-      if(!next.equals(comp)) {
-        // emit a new element
-        out.collect(new Tuple2<Integer, String>(key, comp));
-        // update compare string
-        comp = next;
-      }
-    }
-    // emit last element
-    out.collect(new Tuple2<Integer, String>(key, comp));
-  }
-}
-
-// [...]
-DataSet<Tuple2<Integer, String>> input = // [...]
-DataSet<Double> output = input
-                         // group DataSet by the first tuple field
-                         .groupBy(0)
-                         // sort groups on second tuple field
-                         .sortGroup(1, Order.ASCENDING)
-                         // // apply GroupReduceFunction on DataSet with sorted groups
-                         .reduceGroup(new DistinctReduce());
-```
-
-**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
-
-#### Combinable GroupReduceFunctions
-
-In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not implicitly combinable. In order to make a `GroupReduceFunction` combinable, you need to implement (override) the ```combine()``` method and annotate the `GroupReduceFunction` with the ```@Combinable``` annotation as shown here:
-
-The following code shows how to compute multiple sums using a combinable `GroupReduceFunction`:
-
-```java
-// Combinable GroupReduceFunction that computes two sums.
-@Combinable
-public class MyCombinableGroupReducer
-         extends GroupReduceFunction<Tuple3<String, Integer, Double>,
-                                     Tuple3<String, Integer, Double>> {
-  @Override
-  public void reduce(Iterator<Tuple3<String, Integer, Double>> in,
-                     Collector<Tuple3<String, Integer, Double>> out) {
-    // one element is always present in iterator
-    Tuple3<String, Integer, Double> curr = in.next();
-    String key = curr.f0;
-    int intSum = curr.f1;
-    double doubleSum = curr.f2;
-    // sum up all ints and doubles
-    while(in.hasNext()) {
-      curr = in.next();
-      intSum += curr.f1;
-      doubleSum += curr.f2;
-    }
-    // emit a tuple with both sums
-    out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
-  }
-
-  @Override
-  public void combine(Iterator<Tuple3<String, Integer, Double>> in,
-                      Collector<Tuple3<String, Integer, Double>> out)) {
-    // in some cases combine() calls can simply be forwarded to reduce().
-    this.reduce(in, out);
-  }
-}
-```
-
-### Aggregate on grouped Tuple DataSet
-
-There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
-
-- Sum,
-- Min, and
-- Max.
-
-The Aggregate transformation can only be applied on a `Tuple` `DataSet` and supports only field positions keys for grouping.
-
-The following code shows how to apply an Aggregation transformation on a `DataSet` grouped by field position keys:
-
-```java
-DataSet<Tuple3<Integer, String, Double>> input = // [...]
-DataSet<Tuple3<Integer, String, Double>> output = input
-                                          // group DataSet on second field
-                                          .groupBy(1)
-                                          // compute sum of the first field
-                                          .aggregate(SUM, 0)
-                                          // compute minimum of the third field
-                                          .and(MIN, 2);
-```
-
-To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet. 
-In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
-
-**Note:** The set of aggregation functions will be extended in the future.
-
-### Reduce on full DataSet
-
-The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataSet`.
-The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
-
-The following code shows how to sum all elements of an Integer `DataSet`:
-
-```java
-// ReduceFunction that sums Integers
-public class IntSummer extends ReduceFunction<Integer> {
-  @Override
-  public Integer reduce(Integer num1, Integer num2) {
-    return num1 + num2;
-  }
-}
-
-// [...]
-DataSet<Integer> intNumbers = // [...]
-DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
-```
-
-Reducing a full `DataSet` using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
-
-### GroupReduce on full DataSet
-
-The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a `DataSet`.
-A `GroupReduceFunction` can iterate over all elements of `DataSet` and return an arbitrary number of result elements.
-
-The following example shows how to apply a GroupReduce transformation on a full `DataSet`:
-
-```java
-DataSet<Integer> input = // [...]
-// apply a (preferably combinable) GroupReduceFunction to a DataSet
-DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
-```
-
-**Note:** A GroupReduce transformation on a full `DataSet` cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
-
-### Aggregate on full Tuple DataSet
-
-There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
-
-- Sum,
-- Min, and
-- Max.
-
-The Aggregate transformation can only be applied on a `Tuple` `DataSet`.
-
-The following code shows how to apply an Aggregation transformation on a full `DataSet`:
-
-```java
-DataSet<Tuple2<Integer, Double>> input = // [...]
-DataSet<Tuple2<Integer, Double>> output = input
-                                          // compute sum of the first field
-                                          .aggregate(SUM, 0)
-                                          // compute minimum of the second field
-                                          .and(MIN, 1);
-```
-
-**Note:** Extending the set of supported aggregation functions is on our roadmap.
-
-### Join
-
-The Join transformation joins two `DataSet`s into one `DataSet`. The elements of both `DataSet`s are joined on one or more keys which can be specified using
-
-- a `KeySelector` function or
-- one or more field position keys (`Tuple` `DataSet` only).
-
-There are a few different ways to perform a Join transformation which are shown in the following.
-
-#### Default Join (Join into Tuple2)
-
-The default Join transformation produces a new `Tuple``DataSet` with two fields. Each tuple holds a joined element of the first input `DataSet` in the first tuple field and a matching element of the second input `DataSet` in the second field.
-
-The following code shows a default Join transformation using field position keys:
-
-```java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Double, Integer>> input2 = // [...]
-// result dataset is typed as Tuple2
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
-            result =
-            input1.join(input2)
-                  // key definition on first DataSet using a field position key
-                  .where(0)
-                  // key definition of second DataSet using a field position key
-                  .equalTo(1);
-```
-
-#### Join with JoinFunction
-
-A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
-A `JoinFunction` receives one element of the first input `DataSet` and one element of the second input `DataSet` and returns exactly one element.
-
-The following code performs a join of `DataSet` with custom java objects and a `Tuple` `DataSet` using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
-
-```java
-// some POJO
-public class Rating {
-  public String name;
-  public String category;
-  public int points;
-}
-
-// Join function that joins a custom POJO with a Tuple
-public class PointWeighter
-         extends JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
-
-  @Override
-  public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
-    // multiply the points and rating and construct a new output tuple
-    return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
-  }
-}
-
-DataSet<Rating> ratings = // [...]
-DataSet<Tuple2<String, Double>> weights = // [...]
-DataSet<Tuple2<String, Double>>
-            weightedRatings =
-            ratings.join(weights)
-                   // key definition of first DataSet using a KeySelector function
-                   .where(new KeySelection<Rating, String>() {
-                            public String getKey(Rating r) { return r.category; }
-                          })
-                   // key definition of second DataSet using a KeySelector function
-                   .equalTo(new KeySelection<Tuple2<String, Double>, String>() {
-                              public String getKey(Tuple2<String, Double> t) { return t.f0; }
-                            })
-                   // applying the JoinFunction on joining pairs
-                   .with(new PointWeighter());
-```
-
-#### Join with Projection
-
-A Join transformation can construct result tuples using a projection as shown here:
-
-```java
-DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
-DataSet<Tuple2<Integer, Double>> input2 = // [...]
-DataSet<Tuple4<Integer, String, Double, Byte>
-            result =
-            input1.join(input2)
-                  // key definition on first DataSet using a field position key
-                  .where(0)
-                  // key definition of second DataSet using a field position key
-                  .equalTo(0)
-                  // select and reorder fields of matching tuples
-                  .projectFirst(0,2).projectSecond(1).projectFirst(1)
-                  .types(Integer.class, String.class, Double.class, Byte.class);
-```
-
-`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output `Tuple`. The order of indexes defines the order of fields in the output tuple.
-The join projection works also for non-`Tuple` `DataSet`s. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output `Tuple`.
-
-#### Join with DataSet Size Hint
-
-In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to join as shown here:
-
-```java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Integer, String>> input2 = // [...]
-
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
-            result1 =
-            // hint that the second DataSet is very small
-            input1.joinWithTiny(input2)
-                  .where(0)
-                  .equalTo(0);
-
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
-            result2 =
-            // hint that the second DataSet is very large
-            input1.joinWithHuge(input2)
-                  .where(0)
-                  .equalTo(0);
-```
-
-### Cross
-
-The Cross transformation combines two `DataSet`s into one `DataSet`. It builds all pairwise combinations of the elements of both input `DataSet`s, i.e., it builds a Cartesian product.
-The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
-
-**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
-
-#### Cross with User-Defined Function
-
-A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
-
-The following code shows how to apply a Cross transformation on two `DataSet`s using a `CrossFunction`:
-
-```java
-public class Coord {
-  public int id;
-  public int x;
-  public int y;
-}
-
-// CrossFunction computes the Euclidean distance between two Coord objects.
-public class EuclideanDistComputer
-         extends CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
-
-  @Override
-  public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
-    // compute Euclidean distance of coordinates
-    double dist = Math.sqrt(Math.pow(c1.x - c2.x, 2) + Math.pow(c1.y - c2.y, 2));
-    return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
-  }
-}
-
-DataSet<Coord> coords1 = // [...]
-DataSet<Coord> coords2 = // [...]
-DataSet<Tuple3<Integer, Integer, Double>>
-            distances =
-            coords1.cross(coords2)
-                   // apply CrossFunction
-                   .with(new EuclideanDistComputer());
-```
-
-#### Cross with Projection
-
-A Cross transformation can also construct result tuples using a projection as shown here:
-
-```java
-DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
-DataSet<Tuple2<Integer, Double>> input2 = // [...]
-DataSet<Tuple4<Integer, Byte, Integer, Double>
-            result =
-            input1.cross(input2)
-                  // select and reorder fields of matching tuples
-                  .projectSecond(0).projectFirst(1,0).projectSecond(1)
-                  .types(Integer.class, Byte.class, Integer.class, Double.class);
-```
-
-The field selection in a Cross projection works the same way as in the projection of Join results.
-
-#### Cross with DataSet Size Hint
-
-In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to cross as shown here:
-
-```java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Integer, String>> input2 = // [...]
-
-DataSet<Tuple4<Integer, String, Integer, String>>
-            udfResult =
-                  // hint that the second DataSet is very small
-            input1.crossWithTiny(input2)
-                  // apply any Cross function (or projection)
-                  .with(new MyCrosser());
-
-DataSet<Tuple3<Integer, Integer, String>>
-            projectResult =
-                  // hint that the second DataSet is very large
-            input1.crossWithHuge(input2)
-                  // apply a projection (or any Cross function)
-                  .projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
-```
-
-### CoGroup
-
-The CoGroup transformation jointly processes groups of two `DataSet`s. Both `DataSet`s are grouped on a defined key and groups of both `DataSet`s that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one `DataSet` has a group, the `CoGroupFunction` is called with this group and an empty group.
-A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
-
-Similar to Reduce, GroupReduce, and Join, keys can be defined using
-
-- a `KeySelector` function or
-- one or more field position keys (`Tuple` `DataSet` only).
-
-#### CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)
-
-```java
-// Some CoGroupFunction definition
-class MyCoGrouper
-         extends CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
-  // set to hold unique Integer values
-  Set<Integer> ints = new HashSet<Integer>();
-
-  @Override
-  public void coGroup(Iterator<Tuple2<String, Integer>> iVals,
-                      Iterator<Tuple2<String, Double>> dVals,
-                      Collector<Double> out) {
-    // clear Integer set
-    ints.clear();
-    // add all Integer values in group to set
-    while(iVals.hasNext()) {
-      ints.add(iVals.next().f1);
-    }
-    // multiply each Double value with each unique Integer values of group
-    while(dVals.hasNext()) {
-      for(Integer i : ints) {
-        out.collect(dVals.next().f1 * i));
-      }
-    }
-  }
-}
-
-// [...]
-DataSet<Tuple2<String, Integer>> iVals = // [...]
-DataSet<Tuple2<String, Double>> dVals = // [...]
-DataSet<Double> output = iVals.coGroup(dVals)
-                         // group first DataSet on first tuple field
-                         .where(0)
-                         // group second DataSet on first tuple field
-                         .equalTo(0)
-                         // apply CoGroup function on each pair of groups
-                         .with(new MyCoGrouper());
-```
-
-#### CoGroup on DataSets grouped by Key Selector Function
-
-Works analogous to key selector functions in Join transformations.
-
-### Union
-
-Produces the union of two `DataSet`s, which have to be of the same type. A union of more than two `DataSet`s can be implemented with multiple union calls, as shown here:
-
-```java
-DataSet<Tuple2<String, Integer>> vals1 = // [...]
-DataSet<Tuple2<String, Integer>> vals2 = // [...]
-DataSet<Tuple2<String, Integer>> vals3 = // [...]
-DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
-                    .union(vals3);
-```
-
-
-[Back to top](#top)
-
-
 <section id="data_sources">
 Data Sources
 ------------
@@ -1175,7 +757,7 @@ List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>()
 myResult.output(new LocalCollectionOutputFormat(outData));
 ```
 
-**Note:** Collection data sources will only work correctly, if the whole program is executed in the same JVM!
+**Note:** Currently, the collection data sink is restricted to local execution, as a debugging tool.
 
 [Back to top](#top)
 
@@ -1281,7 +863,7 @@ Semantic annotations can be attached to functions through Java annotations, or p
 
 ```java
 @ConstantFields("1")
-public class DivideFirstbyTwo extends MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+public class DivideFirstbyTwo implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
   @Override
   public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
     value.f0 /= 2;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d1c49fb/docs/java_api_transformations.md
----------------------------------------------------------------------
diff --git a/docs/java_api_transformations.md b/docs/java_api_transformations.md
new file mode 100644
index 0000000..bd5c45b
--- /dev/null
+++ b/docs/java_api_transformations.md
@@ -0,0 +1,628 @@
+---
+title: "Java API Transformations"
+---
+
+<section id="top">
+DataSet Transformations
+-----------------------
+
+This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the
+Flink Java API, please refer to the [API guide]({{site.baseurl}}/java_api_guide.html)
+
+
+### Map
+
+The Map transformation applies a user-defined `MapFunction` on each element of a DataSet.
+It implements a one-to-one mapping, that is, exactly one element must be returned by
+the function.
+
+The following code transforms a `DataSet` of Integer pairs into a `DataSet` of Integers:
+
+```java
+// MapFunction that adds two integer values
+public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
+  @Override
+  public Integer map(Tuple2<Integer, Integer> in) {
+    return in.f0 + in.f1;
+  }
+}
+
+// [...]
+DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
+DataSet<Integer> intSums = intPairs.map(new IntAdder());
+```
+
+### FlatMap
+
+The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataSet`.
+This variant of a map function can return arbitrary many result elements (including none) for each input element.
+
+The following code transforms a `DataSet` of text lines into a `DataSet` of words:
+
+```java
+// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
+public class Tokenizer implements FlatMapFunction<String, String> {
+  @Override
+  public void flatMap(String value, Collector<String> out) {
+    for (String token : value.split("\\W")) {
+      out.collect(token);
+    }
+  }
+}
+
+// [...]
+DataSet<String> textLines = // [...]
+DataSet<String> words = textLines.flatMap(new Tokenizer());
+
+```
+
+### Filter
+
+The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataSet` and retains only those elements for which the function returns `true`.
+
+The following code removes all Integers smaller than zero from a `DataSet`:
+
+```java
+// FilterFunction that filters out all Integers smaller than zero.
+public class NaturalNumberFilter implements FilterFunction<Integer> {
+  @Override
+  public boolean filter(Integer number) {
+    return number >= 0;
+  }
+}
+
+// [...]
+DataSet<Integer> intNumbers = // [...]
+DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
+```
+
+### Project (Tuple DataSets only)
+
+The Project transformation removes or moves `Tuple` fields of a `Tuple` `DataSet`.
+The `project(int...)` method selects `Tuple` fields that should be retained by their index and defines their order in the output `Tuple`.
+The `types(Class<?> ...)`method must give the types of the output `Tuple` fields.
+
+Projections do not require the definition of a user function.
+
+The following code shows different ways to apply a Project transformation on a `DataSet`:
+
+```java
+DataSet<Tuple3<Integer, Double, String>> in = // [...]
+// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
+DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
+```
+
+### Transformations on grouped DataSet
+
+The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done in two ways:
+
+- a `KeySelector` function or
+- one or more field position keys (`Tuple` `DataSet` only).
+
+Please look at the reduce examples to see how the grouping keys are specified.
+
+### Reduce on grouped DataSet
+
+A Reduce transformation that is applied on a grouped `DataSet` reduces each group to a single element using a user-defined `ReduceFunction`.
+For each group of input elements, a `ReduceFunction` successively combines pairs of elements into one element until only a single element for each group remains.
+
+#### Reduce on DataSet grouped by KeySelector Function
+
+A `KeySelector` function extracts a key value from each element of a `DataSet`. The extracted key value is used to group the `DataSet`.
+The following code shows how to group a POJO `DataSet` using a `KeySelector` function and to reduce it with a `ReduceFunction`.
+
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+  // [...]
+}
+
+// ReduceFunction that sums Integer attributes of a POJO
+public class WordCounter implements ReduceFunction<WC> {
+  @Override
+  public WC reduce(WC in1, WC in2) {
+    return new WC(in1.word, in1.count + in2.count);
+  }
+}
+
+// [...]
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         // DataSet grouping with inline-defined KeySelector function
+                         .groupBy(
+                           new KeySelector<WC, String>() {
+                             public String getKey(WC wc) { return wc.word; }
+                           })
+                         // apply ReduceFunction on grouped DataSet
+                         .reduce(new WordCounter());
+```
+
+#### Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
+
+Field position keys specify one or more fields of a `Tuple` `DataSet` that are used as grouping keys.
+The following code shows how to use field position keys and apply a `ReduceFunction`.
+
+```java
+DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
+DataSet<Tuple3<String, Integer, Double>> reducedTuples =
+                                         tuples
+                                         // group DataSet on first and second field of Tuple
+                                         .groupBy(0,1)
+                                         // apply ReduceFunction on grouped DataSet
+                                         .reduce(new MyTupleReducer());
+```
+
+### GroupReduce on grouped DataSet
+
+A GroupReduce transformation that is applied on a grouped `DataSet` calls a user-defined `GroupReduceFunction` for each group. The difference
+between this and `Reduce` is that the user defined function gets the whole group at once.
+The function is invoked with an Iterable over all elements of a group and can return an arbitrary number of result elements using the collector.
+
+#### GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
+
+The following code shows how duplicate strings can be removed from a `DataSet` grouped by Integer.
+
+```java
+public class DistinctReduce
+         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
+
+  @Override
+  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
+
+    Set<String> uniqStrings = new HashSet<String>();
+    Integer key = null;
+  
+    // add all strings of the group to the set
+    for (Tuple2<Integer, String> t : in) {
+      key = t.f0;
+      uniqStrings.add(t.f1);
+    }
+
+    // emit all unique strings.
+    for (String s : uniqStrings) {
+      out.collect(new Tuple2<Integer, String>(key, s));
+    }
+  }
+}
+
+// [...]
+DataSet<Tuple2<Integer, String>> input = // [...]
+DataSet<Tuple2<Integer, String>> output = input
+                           .groupBy(0)            // group DataSet by the first tuple field
+                           .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction
+```
+
+#### GroupReduce on DataSet grouped by KeySelector Function
+
+Works analogous to `KeySelector` functions in Reduce transformations.
+
+#### GroupReduce on sorted groups (Tuple DataSets only)
+
+A `GroupReduceFunction` accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined `GroupReduceFunction` and improve its efficiency.
+Right now, this feature is only available for DataSets of Tuples.
+
+The following code shows another example how to remove duplicate Strings in a `DataSet` grouped by an Integer and sorted by String.
+
+```java
+// GroupReduceFunction that removes consecutive identical elements
+public class DistinctReduce
+         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
+
+  @Override
+  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
+    Integer key = null;
+    String comp = null;
+
+    for (Tuple2<Integer, String> t : in) {
+      key = t.f0;
+      String next = t.f1;
+
+      // check if strings are different
+      if (com == null || !next.equals(comp)) {
+        out.collect(new Tuple2<Integer, String>(key, next));
+        comp = next;
+      }
+    }
+  }
+}
+
+// [...]
+DataSet<Tuple2<Integer, String>> input = // [...]
+DataSet<Double> output = input
+                         .groupBy(0)                         // group DataSet by first field
+                         .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
+                         .reduceGroup(new DistinctReduce());
+```
+
+**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
+
+#### Combinable GroupReduceFunctions
+
+In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not necessarily combinable. In order to make a `GroupReduceFunction` combinable, you need to implement (override) the `combine()` method and annotate the `GroupReduceFunction` with the `@Combinable` annotation as shown here:
+
+```java
+// Combinable GroupReduceFunction that computes two sums.
+// Note that we use the RichGroupReduceFunction because it defines the combine method
+@Combinable
+public class MyCombinableGroupReducer
+         extends RichGroupReduceFunction<Tuple3<String, Integer, Double>,
+                                     Tuple3<String, Integer, Double>> {
+  @Override
+  public void reduce(Iterable<Tuple3<String, Integer, Double>> in,
+                     Collector<Tuple3<String, Integer, Double>> out) {
+
+    String key = null
+    int intSum = 0;
+    double doubleSum = 0.0;
+
+    for (Tuple3<String, Integer, Double> curr : in) {
+      key = curr.f0;
+      intSum += curr.f1;
+      doubleSum += curr.f2;
+    }
+    // emit a tuple with both sums
+    out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
+  }
+
+  @Override
+  public void combine(Iterable<Tuple3<String, Integer, Double>> in,
+                      Collector<Tuple3<String, Integer, Double>> out)) {
+    // in some cases combine() calls can simply be forwarded to reduce().
+    this.reduce(in, out);
+  }
+}
+```
+
+### Aggregate on grouped Tuple DataSet
+
+There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
+
+- Sum,
+- Min, and
+- Max.
+
+The Aggregate transformation can only be applied on a `Tuple` `DataSet` and supports only field positions keys for grouping.
+
+The following code shows how to apply an Aggregation transformation on a `DataSet` grouped by field position keys:
+
+```java
+DataSet<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input
+                                   .groupBy(1)        // group DataSet on second field
+                                   .aggregate(SUM, 0) // compute sum of the first field
+                                   .and(MIN, 2);      // compute minimum of the third field
+```
+
+To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet. 
+In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
+
+**Note:** The set of aggregation functions will be extended in the future.
+
+### Reduce on full DataSet
+
+The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataSet`.
+The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
+
+The following code shows how to sum all elements of an Integer `DataSet`:
+
+```java
+// ReduceFunction that sums Integers
+public class IntSummer implements ReduceFunction<Integer> {
+  @Override
+  public Integer reduce(Integer num1, Integer num2) {
+    return num1 + num2;
+  }
+}
+
+// [...]
+DataSet<Integer> intNumbers = // [...]
+DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
+```
+
+Reducing a full `DataSet` using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
+
+### GroupReduce on full DataSet
+
+The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a `DataSet`.
+A `GroupReduceFunction` can iterate over all elements of `DataSet` and return an arbitrary number of result elements.
+
+The following example shows how to apply a GroupReduce transformation on a full `DataSet`:
+
+```java
+DataSet<Integer> input = // [...]
+// apply a (preferably combinable) GroupReduceFunction to a DataSet
+DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
+```
+
+**Note:** A GroupReduce transformation on a full `DataSet` cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
+
+### Aggregate on full Tuple DataSet
+
+There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
+
+- Sum,
+- Min, and
+- Max.
+
+The Aggregate transformation can only be applied on a `Tuple` `DataSet`.
+
+The following code shows how to apply an Aggregation transformation on a full `DataSet`:
+
+```java
+DataSet<Tuple2<Integer, Double>> input = // [...]
+DataSet<Tuple2<Integer, Double>> output = input
+                                     .aggregate(SUM, 0)    // compute sum of the first field
+                                     .and(MIN, 1);    // compute minimum of the second field
+```
+
+**Note:** Extending the set of supported aggregation functions is on our roadmap.
+
+### Join
+
+The Join transformation joins two `DataSet`s into one `DataSet`. The elements of both `DataSet`s are joined on one or more keys which can be specified using
+
+- a `KeySelector` function or
+- one or more field position keys (`Tuple` `DataSet` only).
+
+There are a few different ways to perform a Join transformation which are shown in the following.
+
+#### Default Join (Join into Tuple2)
+
+The default Join transformation produces a new `Tuple``DataSet` with two fields. Each tuple holds a joined element of the first input `DataSet` in the first tuple field and a matching element of the second input `DataSet` in the second field.
+
+The following code shows a default Join transformation using field position keys:
+
+```java
+DataSet<Tuple2<Integer, String>> input1 = // [...]
+DataSet<Tuple2<Double, Integer>> input2 = // [...]
+// result dataset is typed as Tuple2
+DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
+            result = input1.join(input2)
+                           .where(0)       // key of the first input
+                           .equalTo(1);    // key of the second input
+```
+
+#### Join with JoinFunction
+
+A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
+A `JoinFunction` receives one element of the first input `DataSet` and one element of the second input `DataSet` and returns exactly one element.
+
+The following code performs a join of `DataSet` with custom java objects and a `Tuple` `DataSet` using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
+
+```java
+// some POJO
+public class Rating {
+  public String name;
+  public String category;
+  public int points;
+}
+
+// Join function that joins a custom POJO with a Tuple
+public class PointWeighter
+         implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
+
+  @Override
+  public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
+    // multiply the points and rating and construct a new output tuple
+    return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
+  }
+}
+
+DataSet<Rating> ratings = // [...]
+DataSet<Tuple2<String, Double>> weights = // [...]
+DataSet<Tuple2<String, Double>>
+            weightedRatings =
+            ratings.join(weights)
+
+                   // key of the first input
+                   .where(new KeySelection<Rating, String>() {
+                            public String getKey(Rating r) { return r.category; }
+                          })
+
+                   // key of the second input
+                   .equalTo(new KeySelection<Tuple2<String, Double>, String>() {
+                              public String getKey(Tuple2<String, Double> t) { return t.f0; }
+                            })
+
+                   // applying the JoinFunction on joining pairs
+                   .with(new PointWeighter());
+```
+
+#### Join with Projection
+
+A Join transformation can construct result tuples using a projection as shown here:
+
+```java
+DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
+DataSet<Tuple2<Integer, Double>> input2 = // [...]
+DataSet<Tuple4<Integer, String, Double, Byte>
+            result =
+            input1.join(input2)
+                  // key definition on first DataSet using a field position key
+                  .where(0)
+                  // key definition of second DataSet using a field position key
+                  .equalTo(0)
+                  // select and reorder fields of matching tuples
+                  .projectFirst(0,2).projectSecond(1).projectFirst(1)
+                  .types(Integer.class, String.class, Double.class, Byte.class);
+```
+
+`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output `Tuple`. The order of indexes defines the order of fields in the output tuple.
+The join projection works also for non-`Tuple` `DataSet`s. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output `Tuple`.
+
+#### Join with DataSet Size Hint
+
+In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to join as shown here:
+
+```java
+DataSet<Tuple2<Integer, String>> input1 = // [...]
+DataSet<Tuple2<Integer, String>> input2 = // [...]
+
+DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
+            result1 =
+            // hint that the second DataSet is very small
+            input1.joinWithTiny(input2)
+                  .where(0)
+                  .equalTo(0);
+
+DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
+            result2 =
+            // hint that the second DataSet is very large
+            input1.joinWithHuge(input2)
+                  .where(0)
+                  .equalTo(0);
+```
+
+### Cross
+
+The Cross transformation combines two `DataSet`s into one `DataSet`. It builds all pairwise combinations of the elements of both input `DataSet`s, i.e., it builds a Cartesian product.
+The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
+
+**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
+
+#### Cross with User-Defined Function
+
+A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
+
+The following code shows how to apply a Cross transformation on two `DataSet`s using a `CrossFunction`:
+
+```java
+public class Coord {
+  public int id;
+  public int x;
+  public int y;
+}
+
+// CrossFunction computes the Euclidean distance between two Coord objects.
+public class EuclideanDistComputer
+         implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
+
+  @Override
+  public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
+    // compute Euclidean distance of coordinates
+    double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
+    return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
+  }
+}
+
+DataSet<Coord> coords1 = // [...]
+DataSet<Coord> coords2 = // [...]
+DataSet<Tuple3<Integer, Integer, Double>>
+            distances =
+            coords1.cross(coords2)
+                   // apply CrossFunction
+                   .with(new EuclideanDistComputer());
+```
+
+#### Cross with Projection
+
+A Cross transformation can also construct result tuples using a projection as shown here:
+
+```java
+DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
+DataSet<Tuple2<Integer, Double>> input2 = // [...]
+DataSet<Tuple4<Integer, Byte, Integer, Double>
+            result =
+            input1.cross(input2)
+                  // select and reorder fields of matching tuples
+                  .projectSecond(0).projectFirst(1,0).projectSecond(1)
+                  .types(Integer.class, Byte.class, Integer.class, Double.class);
+```
+
+The field selection in a Cross projection works the same way as in the projection of Join results.
+
+#### Cross with DataSet Size Hint
+
+In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to cross as shown here:
+
+```java
+DataSet<Tuple2<Integer, String>> input1 = // [...]
+DataSet<Tuple2<Integer, String>> input2 = // [...]
+
+DataSet<Tuple4<Integer, String, Integer, String>>
+            udfResult =
+                  // hint that the second DataSet is very small
+            input1.crossWithTiny(input2)
+                  // apply any Cross function (or projection)
+                  .with(new MyCrosser());
+
+DataSet<Tuple3<Integer, Integer, String>>
+            projectResult =
+                  // hint that the second DataSet is very large
+            input1.crossWithHuge(input2)
+                  // apply a projection (or any Cross function)
+                  .projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
+```
+
+### CoGroup
+
+The CoGroup transformation jointly processes groups of two `DataSet`s. Both `DataSet`s are grouped on a defined key and groups of both `DataSet`s that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one `DataSet` has a group, the `CoGroupFunction` is called with this group and an empty group.
+A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
+
+Similar to Reduce, GroupReduce, and Join, keys can be defined using
+
+- a `KeySelector` function or
+- one or more field position keys (`Tuple` `DataSet` only).
+
+#### CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)
+
+```java
+// Some CoGroupFunction definition
+class MyCoGrouper
+         implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
+
+  @Override
+  public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
+                      Iterable<Tuple2<String, Double>> dVals,
+                      Collector<Double> out) {
+
+    Set<Integer> ints = new HashSet<Integer>();
+
+    // add all Integer values in group to set
+    for (Tuple2<String, Integer>> val : iVale) {
+      ints.add(val.f1);
+    }
+
+    // multiply each Double value with each unique Integer values of group
+    for (Tuple2<String, Double> val : dVals) {
+      for (Integer i : ints) {
+        out.collect(val.f1 * i);
+      }
+    }
+  }
+}
+
+// [...]
+DataSet<Tuple2<String, Integer>> iVals = // [...]
+DataSet<Tuple2<String, Double>> dVals = // [...]
+DataSet<Double> output = iVals.coGroup(dVals)
+                         // group first DataSet on first tuple field
+                         .where(0)
+                         // group second DataSet on first tuple field
+                         .equalTo(0)
+                         // apply CoGroup function on each pair of groups
+                         .with(new MyCoGrouper());
+```
+
+#### CoGroup on DataSets grouped by Key Selector Function
+
+Works analogous to key selector functions in Join transformations.
+
+### Union
+
+Produces the union of two `DataSet`s, which have to be of the same type. A union of more than two `DataSet`s can be implemented with multiple union calls, as shown here:
+
+```java
+DataSet<Tuple2<String, Integer>> vals1 = // [...]
+DataSet<Tuple2<String, Integer>> vals2 = // [...]
+DataSet<Tuple2<String, Integer>> vals3 = // [...]
+DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
+                    .union(vals3);
+```
+
+
+[Back to top](#top)
+


Mime
View raw message