flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] git commit: [doc] Update programming guide for scala expression keys
Date Mon, 13 Oct 2014 14:02:36 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master da21ea3ef -> 17e9b5b2a


[doc] Update programming guide for scala expression keys


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a868da83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a868da83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a868da83

Branch: refs/heads/master
Commit: a868da834b4b583037003e3d33c4a7dbab543e78
Parents: da21ea3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Sat Oct 11 11:05:02 2014 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Oct 11 11:05:02 2014 +0200

----------------------------------------------------------------------
 docs/programming_guide.md | 139 +++++++++++++++++++++++++++++++----------
 1 file changed, 106 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a868da83/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index fdea599..85d518a 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -872,6 +872,7 @@ values. Keys are "virtual": they are defined as functions over the
 actual data to guide the grouping operator.
 
 ### Define keys for Tuples
+{:.no_toc}
 
 The simplest case is grouping a data set of Tuples on one or more
 fields of the Tuple:
@@ -904,6 +905,8 @@ DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>>
ds;
 Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the
Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you
have to use a string-based expression, as explained below. For this particular example, you
would have to specfiy `f0.f0`.
 
 ### Define key using a String Expression
+{:.no_toc}
+
 Starting from release 0.7-incubating, you can use String-based key expressions to select
keys.
 
 The String expressions allow to specify the name of the field in a class you want to group
by.
@@ -964,6 +967,7 @@ These are valid expressions for the example POJO above:
 Please note that you can only use types inside POJOs that Flink is able to serialize. Currently,
we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
 
 ### Define key using a Key Selector Function
+{:.no_toc}
 
 An additional way to define keys are "key selector" functions, which
 takes as argument one dataset element and returns a key of an
@@ -1027,64 +1031,133 @@ you do not need to physically pack the data set types into keys and
 values. Keys are "virtual": they are defined as functions over the
 actual data to guide the grouping operator.
 
-The simplest case is grouping a data set of Case Classes on one or more
-of it's fields:
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
 {% highlight scala %}
-case class WordCount(docId: Int, word: String, count: Int)
-val input: DataSet[WordCount] = // [...]
+val input: DataSet[(Int, String, Long)] = // [...]
 val grouped = input
-  .groupBy("word")
+  .groupBy(0)
   .reduceGroup(/*do something*/)
 {% endhighlight %}
 
-The data set is grouped on the second field of the Case Class (the one of
-String type). The group reduce function will thus receive groups of elements with
-the same value in the second field.
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The group-reduce function will thus receive groups of tuples with
+the same value in the first field.
 
 {% highlight scala %}
-val input: DataSet[WordCount] = // [...]
+val input: DataSet[(Int, String, Long)] = // [...]
 val grouped = input
-  .groupBy("docId", "word")
-  .reduceGroup(/*do something*/);
+  .groupBy(0,1)
+  .reduce(/*do something*/)
 {% endhighlight %}
 
-Here the DataSet is grouped on the composite key consisting of the first and the
-second fields, therefore the group reduce function will receive groups
-with the same value in both fields.
+The data set is grouped on the composite key consisting of the first and the
+second fields, therefore the group-reduce function will receive groups
+with the same value for both fields.
 
-As a special case, fields of Tuple DataSets can also be specified by (zero-based) index:
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
 {% highlight scala %}
-val input: DataSet[(Int, String, Int)] = // [...]
-val grouped = input
-  .groupBy(0, 1)
-  .reduceGroup(/*do something*/);
+val ds: DataSet[((Int, Float), String, Long)]
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the
Int and
+Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use
a
+string-based expression, as explained below. For this particular example, you would have
to specfiy
+`"_1._1"`.
+
+### Define key using a String Expression
+{:.no_toc}
+
+Starting from release 0.7-incubating, you can use String-based key expressions to select
keys.
+
+The String expressions allow to specify the name of the field in a class you want to group
by.
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by
the field
+"word", we just pass this name to the `groupBy()` function.
+
+{% highlight java %}
+// some ordinary POJO (Plain old Java Object)
+class WC(var word: String, var count: Int) {
+  def this() { this("", 0L) }
+}
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word").reduce(/*do something*/)
+
+// or, as a case class, which is less typing
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word").reduce(/*do something*/)
 {% endhighlight %}
 
-For DataSets that don't contain Case Classes or Tuples, key definition is done via a "key
selector"
-function, which takes as argument one dataset element and must return a key of an
-arbitrary data type. For example:
+**Conditions** for a class to enable using field selection expressions:
+
+- The class must be public
+- It must have a public constructor without arguments or be a case class.
+- All fields either have to be public or there must be getters and setters for all non-public
+ fields. If the field name is `foo` the getter and setters must be called `foo` and `foo_=`.
This
+ is what normally gets generated when you hava a `var foo` in your class. This also automatically
+ applies to case classes since the getters and setters are automatically generated.
+
+**Valid Expressions**:
+
+- You can select POJO fields by their field name
+- You can select Tuple fields by their field name as well. For example `_1` or `_6`.
+- You can select nested fields in POJOs and Tuples. Expressions like `user.zip` or `user.groupId`
+  are valid. Flink also supports POJOs inside Tuples: `_2.user.zip`.
+- You can select all fields at each level. To select all fields, specify `*`. This also works
for
+  the nested case: `user.*`.
+
+**Example for nested POJO**
+
 {% highlight scala %}
-// some ordinary object
-class WC {
-  val word: String
-  val count: Int
+class WC(var complex: ComplexNestedClass, var count: Int) {
+  def this() { this(null, 0) }
+}
+class ComplexNestedClass(
+    var someNumber: Int,
+    someFloat: Float,
+    word: (Long, Long, String),
+    hadoopCitizen: IntWritable) {
+  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
 }
+{% endhighlight %}
+
+These are valid expressions for the example POJO above:
+
+- `count`: The count field in the `WC` class.
+- `complex.*`: Selects all fields in the `ComplexNestedClass`.
+- `complex.word._3`: Selects the last field in the Tuple3.
+- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
+
+Please note that you can only use types inside POJOs that Flink is able to serialize. Currently,
+we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
+
+### Define key using a Key Selector Function
+{:.no_toc}
+
+An additional way to define keys are "key selector" functions, which
+takes as argument one dataset element and returns a key of an
+arbitrary data type by performing an arbitrary computation on this
+element. For example:
+{% highlight scala %}
+// some ordinary case class
+case class WC(word: String, count: Int)
 val words: DataSet[WC] = // [...]
-val counts: DataSet[WC] = words groupBy { _.word } reduce { /*do something*/}
+val wordCounts = words
+  .groupBy( _.word ).reduce(/*do something*/)
 {% endhighlight %}
 
 Remember that keys are not only used for grouping, but also joining and matching data sets:
 {% highlight scala %}
-// some object
+// some case class
 case class Rating(name: String, category: String, points: Int)
+
 val ratings: DataSet[Rating] = // [...]
 val weights: DataSet[(String, Double)] = // [...]
-
-// Tuples are also Case Classes in Scala, so we could use this:
 val weightedRatings = ratings.join(weights).where("category").equalTo("_1")
-
-// Or This:
-val weightedRatings2 = ratings.join(weights).where("category").equalTo(0)
 {% endhighlight %}
 </div>
 </div>


Mime
View raw message