flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/6] flink git commit: [FLINK-2843] Add documentation for DataSet outer joins.
Date Thu, 15 Oct 2015 13:07:20 GMT
Repository: flink
Updated Branches:
  refs/heads/master f7ab4f373 -> c82ebbfce


[FLINK-2843] Add documentation for DataSet outer joins.

This closes #1248


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9e32da2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9e32da2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9e32da2

Branch: refs/heads/master
Commit: d9e32da2631d519d434238b6153332ed03047461
Parents: f7ab4f3
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Oct 9 18:46:31 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Oct 15 11:28:57 2015 +0200

----------------------------------------------------------------------
 docs/apis/dataset_transformations.md | 171 +++++++++++++++++++++++++++++-
 docs/apis/programming_guide.md       |  50 +++++++--
 2 files changed, 209 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9e32da2/docs/apis/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/dataset_transformations.md b/docs/apis/dataset_transformations.md
index 0847190..cc7e742 100644
--- a/docs/apis/dataset_transformations.md
+++ b/docs/apis/dataset_transformations.md
@@ -1105,7 +1105,7 @@ Not supported.
 
 The Join transformation joins two DataSets into one DataSet. The elements of both DataSets
are joined on one or more keys which can be specified using
 
-- a kex expression
+- a key expression
 - a key-selector function
 - one or more field position keys (Tuple DataSet only).
 - Case Class Fields
@@ -1152,7 +1152,7 @@ val result = input1.join(input2).where(0).equalTo(1)
 </div>
 </div>
 
-#### Join with Join-Function
+#### Join with Join Function
 
 A Join transformation can also call a user-defined join function to process joining tuples.
 A join function receives one element of the first input DataSet and one element of the second
input DataSet and returns exactly one element.
@@ -1380,7 +1380,7 @@ DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
 DataSet<Tuple2<SomeType, AnotherType> result =
-      input1.join(input2, BROADCAST_HASH_FIRST)
+      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
             .where("id").equalTo("key");
 ~~~
 
@@ -1392,7 +1392,7 @@ val input1: DataSet[SomeType] = // [...]
 val input2: DataSet[AnotherType] = // [...]
 
 // hint that the second DataSet is very small
-val result1 = input1.join(input2, BROADCAST_HASH_FIRST).where("id").equalTo("key")
+val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
 
 ~~~
 
@@ -1432,6 +1432,169 @@ The following hints are available:
   already sorted.
 
 
+### OuterJoin
+
+The OuterJoin transformation performs a left, right, or full outer join on two data sets.
Outer joins are similar to regular (inner) joins and create all pairs of elements that are
equal on their keys. In addition, records of the "outer" side (left, right, or both in case
of full) are preserved if no matching key is found in the other side. Matching pair of elements
(or one element and a `null` value for the other input) are given to a `JoinFunction` to turn
the pair of elements into a single element, or to a `FlatJoinFunction` to turn the pair of
elements into arbitararily many (including none) elements. 
+
+The elements of both DataSets are joined on one or more keys which can be specified using
+
+- a key expression
+- a key-selector function
+- one or more field position keys (Tuple DataSet only).
+- Case Class Fields
+
+**OuterJoins are only supported for the Java and Scala DataSet API.**
+
+
+#### OuterJoin with Join Function
+
+A OuterJoin transformation calls a user-defined join function to process joining tuples.
+A join function receives one element of the first input DataSet and one element of the second
input DataSet and returns exactly one element. Depending on the type of the outer join (left,
right, full) one of both input elements of the join function can be `null`.
+
+The following code performs a left outer join of DataSet with custom java objects and a Tuple
DataSet using key-selector functions and shows how to use a user-defined join function:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// some POJO
+public class Rating {
+  public String name;
+  public String category;
+  public int points;
+}
+
+// Join function that joins a custom POJO with a Tuple
+public class PointAssigner
+         implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String,
Integer>> {
+
+  @Override
+  public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating)
{
+    // Assigns the rating points to the movie.
+    // NOTE: rating might be null
+    return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points;
+  }
+}
+
+DataSet<Tuple2<String, String>> movies = // [...]
+DataSet<Rating> ratings = // [...]
+DataSet<Tuple2<String, Integer>>
+            moviesWithPoints =
+            movies.leftOuterJoin(ratings)
+
+                   // key of the first input
+                   .where("f0")
+
+                   // key of the second input
+                   .equalTo("name")
+
+                   // applying the JoinFunction on joining pairs
+                   .with(new PointAssigner());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+case class Rating(name: String, category: String, points: Int)
+
+val movies: DataSet[(String, String)] = // [...]
+val ratings: DataSet[Ratings] = // [...]
+
+val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
+  (movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+#### OuterJoin with Flat-Join Function
+
+Analogous to Map and FlatMap, an OuterJoin with flat-join function behaves in the same
+way as an OuterJoin with join function, but instead of returning one element, it can
+return (collect), zero, one, or more elements.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public class PointAssigner
+         implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String,
Integer>> {
+  @Override
+  public void join(Tuple2<String, String> movie, Rating rating
+    Collector<Tuple2<String, Integer>> out) {
+  if (rating == null ) {
+    out.collect(new Tuple2<String, Integer>(movie.f0, -1));
+  } else if (rating.points < 10) {
+    out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
+  } else {
+    // do not emit
+  }
+}
+
+DataSet<Tuple2<String, Integer>>
+            moviesWithPoints =
+            movies.leftOuterJoin(ratings) // [...]
+~~~
+
+#### Join Algorithm Hints
+
+The Flink runtime can execute outer joins in various ways. Each possible way outperforms
the others under
+different circumstances. The system tries to pick a reasonable way automatically, but allows
you
+to manually pick a strategy, in case you want to enforce a specific way of executing the
outer join.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<SomeType> input1 = // [...]
+DataSet<AnotherType> input2 = // [...]
+
+DataSet<Tuple2<SomeType, AnotherType> result =
+      input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
+            .where("id").equalTo("key");
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input1: DataSet[SomeType] = // [...]
+val input2: DataSet[AnotherType] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+**NOTE:** Right now, outer joins can only be executed using the `REPARTITION_SORT_MERGE`
strategy. Further execution strategies will be added in the future.
+
+* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+
+* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is
already
+  partitioned) and sorts each input (unless it is already sorted). The inputs are joined
by
+  a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs
are
+  already sorted.
+
+
 ### Cross
 
 The Cross transformation combines two DataSets into one DataSet. It builds all pairwise combinations
of the elements of both input DataSets, i.e., it builds a Cartesian product.

http://git-wip-us.apache.org/repos/asf/flink/blob/d9e32da2/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index ad29ac3..7f5e41d 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -630,8 +630,8 @@ DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
       <td>
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element,
or a
-        FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
-        elements. See <a href="#specifying-keys">keys</a> on how to define join
keys.
+        FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
+        elements. See the <a href="#specifying-keys">keys section</a> to learn
how to define join keys.
 {% highlight java %}
 result = input1.join(input2)
                .where(0)       // key of the first input (tuple field 0)
@@ -650,7 +650,27 @@ result = input1.join(input2)
 result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                .where(0).equalTo(1);
 {% endhighlight %}
-        Note that the join transformation works only for equi-joins. Other join types, for
example outer-joins need to be expressed using CoGroup.
+        Note that the join transformation works only for equi-joins. Other join types need
to be expressed using OuterJoin or CoGroup.
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>OuterJoin</strong></td>
+      <td>
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar
to regular (inner) joins and create all pairs of elements that are equal on their keys. In
addition, records of the "outer" side (left, right, or both in case of full) are preserved
if no matching key is found in the other side. Matching pairs of elements (or one element
and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements
into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily
many (including none)         elements. See the <a href="#specifying-keys">keys section</a>
to learn how to define join keys.
+{% highlight java %}
+input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
+      .where(0)              // key of the first input (tuple field 0)
+      .equalTo(1)            // key of the second input (tuple field 1)
+      .with(new JoinFunction<String, String, String>() {
+          public String join(String v1, String v2) {
+             // NOTE: 
+             // - v2 might be null for leftOuterJoin
+             // - v1 might be null for rightOuterJoin
+             // - v1 OR v2 might be null for fullOuterJoin
+          }
+      });
+{% endhighlight %}
       </td>
     </tr>
 
@@ -659,7 +679,7 @@ result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on
one or more
         fields and then joins the groups. The transformation function is called per pair
of groups.
-        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define
coGroup keys.</p>
 {% highlight java %}
 data1.coGroup(data2)
      .where(0)
@@ -906,8 +926,8 @@ val output: DataSet[(Int, String, Doublr)] = input.sum(0).min(2)
       <td>
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element,
or a
-        FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
-        elements. See <a href="#specifying-keys">keys</a> on how to define join
keys.
+        FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
+        elements. See the <a href="#specifying-keys">keys section</a> to learn
how to define join keys.
 {% highlight scala %}
 // In this case tuple fields are used as keys. "0" is the join field on the first tuple
 // "1" is the join field on the second tuple.
@@ -926,7 +946,21 @@ val result = input1.join(input2).where(0).equalTo(1)
 val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                    .where(0).equalTo(1)
 {% endhighlight %}
-          Note that the join transformation works only for equi-joins. Other join types,
for example outer-joins need to be expressed using CoGroup.
+          Note that the join transformation works only for equi-joins. Other join types need
to be expressed using OuterJoin or CoGroup.
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>OuterJoin</strong></td>
+      <td>
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar
to regular (inner) joins and create all pairs of elements that are equal on their keys. In
addition, records of the "outer" side (left, right, or both in case of full) are preserved
if no matching key is found in the other side. Matching pairs of elements (or one element
and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements
into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily
many (including none)         elements. See the <a href="#specifying-keys">keys section</a>
to learn how to define join keys.
+{% highlight scala %}
+val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
+   (left, right) =>
+     val a = if (left == null) "none" else left._1
+     (a, right)
+  }
+{% endhighlight %}
       </td>
     </tr>
 
@@ -935,7 +969,7 @@ val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on
one or more
         fields and then joins the groups. The transformation function is called per pair
of groups.
-        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define
coGroup keys.
 {% highlight scala %}
 data1.coGroup(data2).where(0).equalTo(1)
 {% endhighlight %}


Mime
View raw message