flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [57/60] git commit: [doc] Unify "DataSet Transformations" page
Date Mon, 22 Sep 2014 12:29:39 GMT
[doc] Unify "DataSet Transformations" page


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/69808fdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/69808fdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/69808fdd

Branch: refs/heads/master
Commit: 69808fddad3ee9ab445c1dd6063c0488636546b1
Parents: c778d28
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Sat Sep 20 08:48:53 2014 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Sep 22 13:43:03 2014 +0200

----------------------------------------------------------------------
 docs/dataset_transformations.md | 164 ++++++++++++++++++++++++-----------
 1 file changed, 111 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/69808fdd/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index e027eb5..fec796b 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -534,7 +534,8 @@ DataSet<Tuple3<Integer, String, Double>> output = input
 <div data-lang="scala" markdown="1">
 
 ~~~scala
-
+val input: DataSet[(Int, String, Double)] = // [...]
+val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
 ~~~
 
 </div>
@@ -547,8 +548,8 @@ In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply
an aggrega
 
 ### Reduce on full DataSet
 
-The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a DataSet.
-The `ReduceFunction` subsequently combines pairs of elements into one element until only
a single element remains.
+The Reduce transformation applies a user-defined reduce function to all elements of a DataSet.
+The reduce function subsequently combines pairs of elements into one element until only a
single element remains.
 
 The following code shows how to sum all elements of an Integer DataSet:
 
@@ -573,18 +574,19 @@ DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
 <div data-lang="scala" markdown="1">
 
 ~~~scala
-
+val intNumbers = env.fromElements(1,2,3)
+val sum = intNumbers.reduce (_ + _)
 ~~~
 
 </div>
 </div>
 
-Reducing a full DataSet using the Reduce transformation implies that the final Reduce operation
cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that
a Reduce transformation does not limit scalability for most use cases.
+Reducing a full DataSet using the Reduce transformation implies that the final Reduce operation
cannot be done in parallel. However, a reduce function is automatically combinable such that
a Reduce transformation does not limit scalability for most use cases.
 
 ### GroupReduce on full DataSet
 
-The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements
of a DataSet.
-A `GroupReduceFunction` can iterate over all elements of DataSet and return an arbitrary
number of result elements.
+The GroupReduce transformation applies a user-defined group-reduce function on all elements
of a DataSet.
+A group-reduce can iterate over all elements of DataSet and return an arbitrary number of
result elements.
 
 The following example shows how to apply a GroupReduce transformation on a full DataSet:
 
@@ -601,17 +603,22 @@ DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
 <div data-lang="scala" markdown="1">
 
 ~~~scala
-
+val input: DataSet[Int] = // [...]
+val output = input.reduceGroup(new MyGroupReducer())
 ~~~
 
 </div>
 </div>
 
-**Note:** A GroupReduce transformation on a full DataSet cannot be done in parallel if the
`GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation.
See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement
a combinable `GroupReduceFunction`.
+**Note:** A GroupReduce transformation on a full DataSet cannot be done in parallel if the
+group-reduce function is not combinable. Therefore, this can be a very compute intensive
operation.
+See the paragraph on "Combineable Group-Reduce Functions" above to learn how to implement
a
+combinable group-reduce function.
 
 ### Aggregate on full Tuple DataSet
 
-There are some common aggregation operations that are frequently used. The Aggregate transformation
provides the following build-in aggregation functions:
+There are some common aggregation operations that are frequently used. The Aggregate transformation
+provides the following build-in aggregation functions:
 
 - Sum,
 - Min, and
@@ -635,6 +642,8 @@ DataSet<Tuple2<Integer, Double>> output = input
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+val input: DataSet[(Int, String, Double)] = // [...]
+val output = input.aggregate(SUM, 0).and(MIN, 2)
 
 ~~~
 
@@ -647,14 +656,15 @@ DataSet<Tuple2<Integer, Double>> output = input
 
 The Join transformation joins two DataSets into one DataSet. The elements of both DataSets
are joined on one or more keys which can be specified using
 
-- a `KeySelector` function or
+- a key-selector function or
 - one or more field position keys (Tuple DataSet only).
+- Case Class Fields
 
 There are a few different ways to perform a Join transformation which are shown in the following.
 
 #### Default Join (Join into Tuple2)
 
-The default Join transformation produces a new TupleDataSet with two fields. Each tuple holds
a joined element of the first input DataSet in the first tuple field and a matching element
of the second input DataSet in the second field.
+The default Join transformation produces a new Tuple DataSet with two fields. Each tuple
holds a joined element of the first input DataSet in the first tuple field and a matching
element of the second input DataSet in the second field.
 
 The following code shows a default Join transformation using field position keys:
 
@@ -675,18 +685,20 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double,
Integer>>>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
-
+val input1: DataSet[(Int, String)] = // [...]
+val input2: DataSet[(Double, Int)] = // [...]
+val result = input1.join(input2).where(0).equalTo(1)
 ~~~
 
 </div>
 </div>
 
-#### Join with JoinFunction
+#### Join with Join-Function
 
-A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
-A `JoinFunction` receives one element of the first input DataSet and one element of the second
input DataSet and returns exactly one element.
+A Join transformation can also call a user-defined join function to process joining tuples.
+A join function receives one element of the first input DataSet and one element of the second
input DataSet and returns exactly one element.
 
-The following code performs a join of DataSet with custom java objects and a Tuple DataSet
using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
+The following code performs a join of DataSet with custom java objects and a Tuple DataSet
using key-selector functions and shows how to use a user-defined join function:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -734,18 +746,29 @@ DataSet<Tuple2<String, Double>>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+case class Rating(name: String, category: String, points: Int)
+
+val ratings: DataSet[Ratings] = // [...]
+val weights: DataSet[(String, Double)] = // [...]
 
+val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
+  (rating, weight) => (rating.name, rating.points * weight._2)
+}
 ~~~
 
 </div>
 </div>
 
-#### Join with FlatJoinFunction
+#### Join with Flat-Join Function
 
-Analogous to Map and FlatMap, a FlatJoin function behaves in the same
-way as a JoinFunction, but instead of returning one element, it can
+Analogous to Map and FlatMap, a FlatJoin behaves in the same
+way as a Join, but instead of returning one element, it can
 return (collect), zero, one, or more elements.
-{% highlight java %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
 public class PointWeighter
          implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String,
Double>> {
   @Override
@@ -760,15 +783,12 @@ public class PointWeighter
 DataSet<Tuple2<String, Double>>
             weightedRatings =
             ratings.join(weights) // [...]
-{% endhighlight %}
+~~~
 
-#### Join with Projection
+#### Join with Projection (Java Only)
 
 A Join transformation can construct result tuples using a projection as shown here:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
 ~~~java
 DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
 DataSet<Tuple2<Integer, Double>> input2 = // [...]
@@ -784,19 +804,28 @@ DataSet<Tuple4<Integer, String, Double, Byte>
                   .types(Integer.class, String.class, Double.class, Byte.class);
 ~~~
 
+`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second
joined input that should be assembled into an output Tuple. The order of indexes defines the
order of fields in the output tuple.
+The join projection works also for non-Tuple DataSets. In this case, `projectFirst()` or
`projectSecond()` must be called without arguments to add a joined element to the output Tuple.
+
 </div>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+case class Rating(name: String, category: String, points: Int)
+
+val ratings: DataSet[Ratings] = // [...]
+val weights: DataSet[(String, Double)] = // [...]
+
+val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
+  (rating, weight, out: Collector[(String, Double)] =>
+    if (weight._2 > 0.1) out.collect(left.name, left.points * right._2)
+}
 
 ~~~
 
 </div>
 </div>
 
-`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second
joined input that should be assembled into an output Tuple. The order of indexes defines the
order of fields in the output tuple.
-The join projection works also for non-Tuple DataSets. In this case, `projectFirst()` or
`projectSecond()` must be called without arguments to add a joined element to the output Tuple.
-
 #### Join with DataSet Size Hint
 
 In order to guide the optimizer to pick the right execution strategy, you can hint the size
of a DataSet to join as shown here:
@@ -827,6 +856,14 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer,
String>>>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+val input1: DataSet[(Int, String)] = // [...]
+val input2: DataSet[(Int, String)] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
+
+// hint that the second DataSet is very large
+val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
 
 ~~~
 
@@ -836,15 +873,15 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer,
String>>>
 ### Cross
 
 The Cross transformation combines two DataSets into one DataSet. It builds all pairwise combinations
of the elements of both input DataSets, i.e., it builds a Cartesian product.
-The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements
or applies a projection. Both modes are shown in the following.
+The Cross transformation either calls a user-defined cross function on each pair of elements
or outputs a Tuple2. Both modes are shown in the following.
 
 **Note:** Cross is potentially a *very* compute-intensive operation which can challenge even
large compute clusters!
 
 #### Cross with User-Defined Function
 
-A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives
one element of the first input and one element of the second input and returns exactly one
result element.
+A Cross transformation can call a user-defined cross function. A cross function receives
one element of the first input and one element of the second input and returns exactly one
result element.
 
-The following code shows how to apply a Cross transformation on two DataSets using a `CrossFunction`:
+The following code shows how to apply a Cross transformation on two DataSets using a cross
function:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -877,23 +914,10 @@ DataSet<Tuple3<Integer, Integer, Double>>
                    .with(new EuclideanDistComputer());
 ~~~
 
-</div>
-<div data-lang="scala" markdown="1">
-
-~~~scala
-
-~~~
-
-</div>
-</div>
-
 #### Cross with Projection
 
 A Cross transformation can also construct result tuples using a projection as shown here:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
 ~~~java
 DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
 DataSet<Tuple2<Integer, Double>> input2 = // [...]
@@ -905,18 +929,28 @@ DataSet<Tuple4<Integer, Byte, Integer, Double>
                   .types(Integer.class, Byte.class, Integer.class, Double.class);
 ~~~
 
+The field selection in a Cross projection works the same way as in the projection of Join
results.
+
 </div>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+case class Coord(id: Int, x: Int, y: Int)
+
+val coords1: DataSet[Coord] = // [...]
+val coords2: DataSet[Coord] = // [...]
 
+val distances = coords1.cross(coords2) {
+  (c1, c2) =>
+    val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
+    (c1.id, c2.id, dist)
+}
 ~~~
 
+
 </div>
 </div>
 
-The field selection in a Cross projection works the same way as in the projection of Join
results.
-
 #### Cross with DataSet Size Hint
 
 In order to guide the optimizer to pick the right execution strategy, you can hint the size
of a DataSet to cross as shown here:
@@ -947,6 +981,14 @@ DataSet<Tuple3<Integer, Integer, String>>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+val input1: DataSet[(Int, String)] = // [...]
+val input2: DataSet[(Int, String)] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.crossWithTiny(input2)
+
+// hint that the second DataSet is very large
+val result1 = input1.crossWithHuge(input2)
 
 ~~~
 
@@ -955,13 +997,14 @@ DataSet<Tuple3<Integer, Integer, String>>
 
 ### CoGroup
 
-The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped
on a defined key and groups of both DataSets that share the same key are handed together to
a user-defined `CoGroupFunction`. If for a specific key only one DataSet has a group, the
`CoGroupFunction` is called with this group and an empty group.
-A `CoGroupFunction` can separately iterate over the elements of both groups and return an
arbitrary number of result elements.
+The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped
on a defined key and groups of both DataSets that share the same key are handed together to
a user-defined co-group function. If for a specific key only one DataSet has a group, the
co-group function is called with this group and an empty group.
+A co-group function can separately iterate over the elements of both groups and return an
arbitrary number of result elements.
 
 Similar to Reduce, GroupReduce, and Join, keys can be defined using
 
-- a `KeySelector` function or
-- one or more field position keys (Tuple DataSet only).
+- a key-selector function or
+- one or more field position keys (Tuple DataSet only) or
+- Case Class fields.
 
 #### CoGroup on DataSets Grouped by Field Position Keys (Tuple DataSets only)
 
@@ -1010,7 +1053,19 @@ DataSet<Double> output = iVals.coGroup(dVals)
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+val iVals: DataSet[(String, Int)] = // [...]
+val dVals: DataSet[(String, Double)] = // [...]
+
+val output = iVals.coGroup(dVals).where(0).equalTo(0) {
+  (iVals, dVals, out: Collector[Double]) =>
+    val ints = iVals map { _._2 } toSet
 
+    for (dVal <- dVals) {
+      for (i <- ints) {
+        out.collect(dVal._2 * i)
+      }
+    }
+}
 ~~~
 
 </div>
@@ -1031,15 +1086,18 @@ Produces the union of two DataSets, which have to be of the same type.
A union o
 DataSet<Tuple2<String, Integer>> vals1 = // [...]
 DataSet<Tuple2<String, Integer>> vals2 = // [...]
 DataSet<Tuple2<String, Integer>> vals3 = // [...]
-DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
-                    .union(vals3);
+DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
 ~~~
 
 </div>
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+val vals1: DataSet[(String, Int)] = // [...]
+val vals2: DataSet[(String, Int)] = // [...]
+val vals3: DataSet[(String, Int)] = // [...]
 
+val unioned = vals1.union(vals2).union(vals3)
 ~~~
 
 </div>


Mime
View raw message