flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [13/13] flink git commit: [FLINK-1328] [docs] Updated documentation of semantic properties. Improved documentation of key specification and data type.
Date Wed, 28 Jan 2015 01:24:13 GMT
[FLINK-1328] [docs] Updated documentation of semantic properties.
Improved documentation of key specification and data type.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c2c4260
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c2c4260
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c2c4260

Branch: refs/heads/master
Commit: 4c2c426085ebaa0236b91a733456baf606ae0edd
Parents: de8e066
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Jan 28 01:19:27 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Jan 28 01:39:01 2015 +0100

----------------------------------------------------------------------
 docs/programming_guide.md | 690 +++++++++++++++++++++--------------------
 1 file changed, 353 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c2c4260/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index cf91751..9f0d442 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -901,8 +901,7 @@ possible for [Data Sources](#data-sources) and [Data Sinks](#data-sinks).
 Specifying Keys
 -------------
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+
 
 Some transformations (join, coGroup) require that a key is defined on
 its argument DataSets, and other transformations (Reduce, GroupReduce,
@@ -927,6 +926,9 @@ actual data to guide the grouping operator.
 
 The simplest case is grouping a data set of Tuples on one or more
 fields of the Tuple:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
 DataSet<Tuple3<Integer,String,Long> grouped = input
@@ -935,7 +937,7 @@ DataSet<Tuple3<Integer,String,Long> grouped = input
 {% endhighlight %}
 
 The data set is grouped on the first field of the tuples (the one of
-Integer type). The GroupReduceFunction will thus receive groups of tuples with
+Integer type). The GroupReduce function will thus receive groups of tuples with
 the same value in the first field.
 
 {% highlight java %}
@@ -946,23 +948,64 @@ DataSet<Tuple3<Integer,String,Long> grouped = input
 {% endhighlight %}
 
 The data set is grouped on the composite key consisting of the first and the
-second fields, therefore the GroupReduceFuntion will receive groups
+second field. Therefore, the GroupReduce function will receive groups
 with the same value for both fields.
 
 A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
 {% highlight java %}
 DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
 {% endhighlight %}
-Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the
Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you
have to use a string-based expression, as explained below. For this particular example, you
would have to specfiy `f0.f0`.
 
-### Define key using a String Expression
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the
Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you
have to use field expression keys which are explained below.
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input
+  .groupBy(0)
+  .reduceGroup(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The GroupReduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input
+  .groupBy(0,1)
+  .reduce(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second field. Therefore, the GroupReduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
+{% highlight scala %}
+val ds: DataSet[((Int, Float), String, Long)]
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the
Int and
+Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use
field expression keys which are explained below.
+
+</div>
+</div>
+
+### Define keys using Field Expressions
 {:.no_toc}
 
-Starting from release 0.7-incubating, you can use String-based key expressions to select
keys.
+Starting from release 0.7-incubating, you can use String-based field expressions to reference
nested fields and define keys for grouping, sorting, joining, or coGrouping. In addition,
field expressions can be used to define [semantic function annotations](#semantic-annotations).
 
-The String expressions allow to specify the name of the field in a class you want to group
by.
+Field expressions make it very easy to select fields in (nested) composite types such as
[Tuple](#tuples-and-case-classes) and [POJO](#pojos) types.
 
-In the example below, we have a `WC` POJO with two fields "word" and "count". To group by
the field "word", we just pass this name to the `groupBy()` function.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by
the field `word`, we just pass its name to the `groupBy()` function.
 {% highlight java %}
 // some ordinary POJO (Plain old Java Object)
 public class WC {
@@ -973,20 +1016,17 @@ DataSet<WC> words = // [...]
 DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);
 {% endhighlight %}
 
-**Conditions** for a class to be treated as a POJO by Flink:
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the "user" field
of a POJO type.
 
-- The class must be public
-- It must have a public constructor without arguments
-- All fields either have to be public or there must be getters and setters for all non-public
fields. If the field name is `foo` the getter and setters must be called `getFoo()` and `setFoo()`.
+- Select Tuple fields by their field name or 0-offset field index. For example `"f0"` and
`"5"` refer to the first and sixth field of a Java Tuple type, respectively.
 
-**Valid Expressions**:
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the
"zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting
and mixing of POJOs and Tuples is supported such as `"f1.user.zip"` or `"user.f3.1.zip"`.
 
-- You can select POJO fields by their field name
-- You can select Tuple fields by their field name as well. For example `f0` or `f5`.
-- You can select nested fields in POJOs and Tuples. Expressions like `user.zip` or `user.groupId`
are valid. Flink also supports POJOs inside Tuples: `f1.user.zip`.
-- You can select all fields at each level. To select all fields, specify `*`. This also works
for the nested case: `user.*`.
+- You can select the full type using the `"*"` wildcard expressions. This does also work
for types which are not Tuple or POJO types.
 
-**Example for nested POJO**
+**Field Expression Example**:
 
 {% highlight java %}
 public static class WC {
@@ -1008,127 +1048,20 @@ public static class ComplexNestedClass {
 }
 {% endhighlight %}
 
-These are valid expressions for the example POJO above:
+These are valid field expressions for the example code above:
 
-- `count`: The count field in the `WC` class.
-- `complex.*`: Selects all fields in the `ComplexNestedClass`.
-- `complex.word.f2`: Selects the last field in the Tuple3.
-- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
-
-Please note that you can only use types inside POJOs that Flink is able to serialize. Currently,
we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
-
-### Define key using a Key Selector Function
-{:.no_toc}
-
-An additional way to define keys are "key selector" functions, which
-takes as argument one dataset element and returns a key of an
-arbitrary data type by performing an arbitrary computation on this
-element. For example:
-{% highlight java %}
-// some ordinary POJO
-public class WC {public String word; public int count;}
-DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words
-                         .groupBy(
-                           new KeySelector<WC, String>() {
-                             public String getKey(WC wc) { return wc.word; }
-                           })
-                         .reduce(/*do something*/);
-{% endhighlight %}
+- `"count"`: The count field in the `WC` class.
 
-Remember that keys are not only used for grouping, but also joining and matching data sets:
-{% highlight java %}
-// some POJO
-public class Rating {
-  public String name;
-  public String category;
-  public int points;
-}
-DataSet<Rating> ratings = // [...]
-DataSet<Tuple2<String, Double>> weights = // [...]
-DataSet<Tuple2<String, Double>>
-            weightedRatings =
-            ratings.join(weights)
+- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
 
-                   // key of the first input
-                   .where(new KeySelector<Rating, String>() {
-                            public String getKey(Rating r) { return r.category; }
-                          })
+- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`.
 
-                   // key of the second input
-                   .equalTo(new KeySelector<Tuple2<String, Double>, String>()
{
-                              public String getKey(Tuple2<String, Double> t) { return
t.f0; }
-                            });
-{% endhighlight %}
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
 
 </div>
-
 <div data-lang="scala" markdown="1">
-Some transformations (join, coGroup) require that a key is defined on
-its argument DataSets, and other transformations (Reduce, GroupReduce,
-Aggregate) allow that the DataSet is grouped on a key before they are
-applied.
-
-A DataSet is grouped as
-{% highlight scala %}
-val input: DataSet[...] = // [...]
-val reduced = input
-  .groupBy(/*define key here*/)
-  .reduceGroup(/*do something*/)
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data set types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-### Define keys for Tuples
-{:.no_toc}
-
-The simplest case is grouping a data set of Tuples on one or more
-fields of the Tuple:
-{% highlight scala %}
-val input: DataSet[(Int, String, Long)] = // [...]
-val grouped = input
-  .groupBy(0)
-  .reduceGroup(/*do something*/)
-{% endhighlight %}
-
-The data set is grouped on the first field of the tuples (the one of
-Integer type). The group-reduce function will thus receive groups of tuples with
-the same value in the first field.
-
-{% highlight scala %}
-val input: DataSet[(Int, String, Long)] = // [...]
-val grouped = input
-  .groupBy(0,1)
-  .reduce(/*do something*/)
-{% endhighlight %}
-
-The data set is grouped on the composite key consisting of the first and the
-second fields, therefore the group-reduce function will receive groups
-with the same value for both fields.
-
-A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
-{% highlight scala %}
-val ds: DataSet[((Int, Float), String, Long)]
-{% endhighlight %}
-
-Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the
Int and
-Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use
a
-string-based expression, as explained below. For this particular example, you would have
to specfiy
-`"_1._1"`.
-
-### Define key using a String Expression
-{:.no_toc}
-
-Starting from release 0.7-incubating, you can use String-based key expressions to select
keys.
-
-The String expressions allow to specify the name of the field in a class you want to group
by.
-
-In the example below, we have a `WC` POJO with two fields "word" and "count". To group by
the field
-"word", we just pass this name to the `groupBy()` function.
 
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by
the field `word`, we just pass its name to the `groupBy()` function.
 {% highlight java %}
 // some ordinary POJO (Plain old Java Object)
 class WC(var word: String, var count: Int) {
@@ -1143,30 +1076,23 @@ val words: DataSet[WC] = // [...]
 val wordCounts = words.groupBy("word").reduce(/*do something*/)
 {% endhighlight %}
 
-**Conditions** for a class to enable using field selection expressions:
+**Field Expression Syntax**:
 
-- The class must be public
-- It must have a public constructor without arguments or be a case class.
-- All fields either have to be public or there must be getters and setters for all non-public
- fields. If the field name is `foo` the getter and setters must be called `foo` and `foo_=`.
This
- is what normally gets generated when you hava a `var foo` in your class. This also automatically
- applies to case classes since the getters and setters are automatically generated.
+- Select POJO fields by their field name. For example `"user"` refers to the "user" field
of a POJO type.
 
-**Valid Expressions**:
+- Select Tuple fields by their 1-offset field name or 0-offset field index. For example `"_1"`
and `"5"` refer to the first and sixth field of a Scala Tuple type, respectively.
 
-- You can select POJO fields by their field name
-- You can select Tuple fields by their field name as well. For example `_1` or `_6`.
-- You can select nested fields in POJOs and Tuples. Expressions like `user.zip` or `user.groupId`
-  are valid. Flink also supports POJOs inside Tuples: `_2.user.zip`.
-- You can select all fields at each level. To select all fields, specify `*`. This also works
for
-  the nested case: `user.*`.
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the
"zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting
and mixing of POJOs and Tuples is supported such as `"_2.user.zip"` or `"user._4.1.zip"`.
 
-**Example for nested POJO**
+- You can select the full type using the `"_"` wildcard expressions. This does also work
for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
 
 {% highlight scala %}
 class WC(var complex: ComplexNestedClass, var count: Int) {
   def this() { this(null, 0) }
 }
+
 class ComplexNestedClass(
     var someNumber: Int,
     someFloat: Float,
@@ -1176,23 +1102,43 @@ class ComplexNestedClass(
 }
 {% endhighlight %}
 
-These are valid expressions for the example POJO above:
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
 
-- `count`: The count field in the `WC` class.
-- `complex.*`: Selects all fields in the `ComplexNestedClass`.
-- `complex.word._3`: Selects the last field in the Tuple3.
-- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
+- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
 
-Please note that you can only use types inside POJOs that Flink is able to serialize. Currently,
-we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
+- `"complex.word._3"`: Selects the last field of the nested `Tuple3`.
 
-### Define key using a Key Selector Function
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+</div>
+
+### Define keys using Key Selector Functions
 {:.no_toc}
 
-An additional way to define keys are "key selector" functions, which
-takes as argument one dataset element and returns a key of an
-arbitrary data type by performing an arbitrary computation on this
-element. For example:
+An additional way to define keys are "key selector" functions. A key selector function 
+takes a single dataset element as input and returns the key for the element. The key can
be of any type and be derived from arbitrary computations. 
+
+The following example shows a key selector function that simply returns the field of an object:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         .groupBy(
+                           new KeySelector<WC, String>() {
+                             public String getKey(WC wc) { return wc.word; }
+                           })
+                         .reduce(/*do something*/);
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
 {% highlight scala %}
 // some ordinary case class
 case class WC(word: String, count: Int)
@@ -1200,16 +1146,6 @@ val words: DataSet[WC] = // [...]
 val wordCounts = words
   .groupBy( _.word ).reduce(/*do something*/)
 {% endhighlight %}
-
-Remember that keys are not only used for grouping, but also joining and matching data sets:
-{% highlight scala %}
-// some case class
-case class Rating(name: String, category: String, points: Int)
-
-val ratings: DataSet[Rating] = // [...]
-val weights: DataSet[(String, Double)] = // [...]
-val weightedRatings = ratings.join(weights).where("category").equalTo("_1")
-{% endhighlight %}
 </div>
 </div>
 
@@ -1369,192 +1305,147 @@ Flink places some restrictions on the type of elements that are
used in DataSets
 of transformations. The reason for this is that the system analyzes the types to determine
 efficient execution strategies.
 
-There are four different categories of data types, which are treated slightly different when
it
-to [specifying keys](#specifying-keys):
-
-1. **General Types and POJOs**
-2. **Tuples**/**Case Classes**
-3. **Values**
-4. **Hadoop Writables**
+There are six different categories of data types:
 
+1. **Java Tuples** and **Scala Case Classes**
+2. **Java POJOs**
+3. **Primitive Types**
+4. **Regular Classes**
+5. **Values**
+6. **Hadoop Writables**
 
-#### General Types
-
-Out of the box, Flink supports all primitive types of your programming language of choice.
-
-Furthermore, you can use the vast majority of custom classes. Restrictions apply to classes
-containing fields that cannot be serialized, like File pointers, I/O streams, or other native
-resources. Classes that follow the Java Beans conventions work well in general. The following
-defines a simple example class to illustrate how you can use custom classes:
+#### Tuples and Case Classes
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-{% highlight java %}
-public class WordWithCount {
 
-    public String word;
-    public int count;
-
-    public WordCount() {}
-
-    public WordCount(String word, int count) {
-        this.word = word;
-        this.count = count;
-    }
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class WordWithCount(val word: String, val count: Int) {
-    def this() {
-      this(null, -1)
-    }
-}
-{% endhighlight %}
-</div>
-</div>
-
-You can use all of those types to parameterize DataSet and function implementations, e.g.
-`DataSet` or a `MapFunction`.
+Tuples are composite types that contain a fixed number of fields with various types. 
+The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple
+can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields
of a
+tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic
getter method
+`tuple.getField(int position)`. The field indicies start at 0. Note that this stands in contrast
+to the Scala tuples, but it is more consistent with Java's general indexing.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-// using a basic data type
-DataSet<String> numbers = env.fromElements("1", "2");
+DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
+    new Tuple2<String, Integer>("hello", 1),
+    new Tuple2<String, Integer>("world", 2));
 
-numbers.map(new MapFunction<String, Integer>() {
+wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
     @Override
-    public String map(String value) throws Exception {
-        return Integer.parseInt(value);
+    public String map(Tuple2<String, Integer> value) throws Exception {
+        return value.f1;
     }
 });
+{% endhighlight %}
 
-// using a custom class
-DataSet<WordCount> wordCounts = env.fromElements(
-    new WordCount("hello", 1),
-    new WordCount("world", 2));
+When grouping, sorting, or joining a data set of tuples, keys can be specified as field positions
or field expressions. See the [key definition section](#specifying-keys) or [data transformation
section](#transformations) for details.
 
-wordCounts.map(new MapFunction<WordCount, Integer>() {
-    @Override
-    public String map(WordCount value) throws Exception {
-        return value.count;
-    }
-});
+{% highlight java %}
+wordCounts
+    .groupBy(0) // also valid .groupBy("f0")
+    .reduce(new MyReduceFunction());
 {% endhighlight %}
+
 </div>
 <div data-lang="scala" markdown="1">
+
+Scala case classes (and Scala tuples which are a special case of case classes), are composite
types that contain a fixed number of fields with various types. Tuple fields are addressed
by their 1-offset names such as `_1` for the first field. Case class fields are accessed by
their name.
+
 {% highlight scala %}
-// using a primitive data type
-// Note that the type ascription "DataSet[String]" can be omitted in Scala
-// it is just given to clarify the type of numbers
-val numbers: DataSet[String] = env.fromElements("1", "2")
-
-numbers.map(new MapFunction[String, Int]() {
-    def map(in: String): Int = {
-      in.toInt
-    }
-})
+case class WordCount(word: String, count: Int)
+val input = env.fromElements(
+    WordCount("hello", 1),
+    WordCount("world", 2)) // Case Class Data Set
 
-// using a custom class
-val wordCounts = env.fromElements(
-  new WordCount("hello", 1),
-  new WordCount("world", 2))
+input.groupBy("word").reduce(...) // group by field expression "word"
 
-wordCounts.map { _.count }
+val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
+
+input2.groupBy(0, 1).reduce(...) // group by field positions 0 and 1
 {% endhighlight %}
+
+When grouping, sorting, or joining a data set of tuples, keys can be specified as field positions
or field expressions. See the [key definition section](#specifying-keys) or [data transformation
section](#transformations) for details.
+
 </div>
 </div>
 
+#### POJOs
+
+Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the
following requirements:
+
+- The class must be public.
 
-When working with operators that require a Key for grouping or matching records
-you can select the key using a key expression (see
-[Specifying Keys](#specifying-keys)).
+- It must have a public constructor without arguments (default constructor).
+
+- All fields are either public or must be accessible through getter and setter functions.
For a field called `foo` the getter and setter methods must be named `getFoo()` and `setFoo()`.
+
+- The type of a field must be supported by Flink. At the moment, Flink uses [Avro](http://avro.apache.org)
to serialize arbitrary objects (such as `Date`).
+
+Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As
a result POJO types are easier to use than general types. Moreover, they Flink can process
POJOs more efficiently than general types.  
+
+The following example shows a simple POJO with two public fields.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-wordCounts.groupBy("word").reduce(new MyReduceFunction());
+public class WordWithCount {
+
+    public String word;
+    public int count;
+
+    public WordCount() {}
+
+    public WordCount(String word, int count) {
+        this.word = word;
+        this.count = count;
+    }
+}
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-wordCounts groupBy { _.word } reduce(new MyReduceFunction())
+class WordWithCount(val word: String, val count: Int) {
+    def this() {
+      this(null, -1)
+    }
+}
 {% endhighlight %}
 </div>
 </div>
 
-
-#### Tuples/Case Classes
+When grouping, sorting, or joining a data set of POJO types, keys can be specified with field
expressions. See the [key definition section](#specifying-keys) or [data transformation section](#transformations)
for details.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-
-You can use the Tuple classes for composite types. Tuples contain a fix number of fields
of
-various types. The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of
a tuple
-can be an arbitrary Flink type - including further tuples, resulting in nested tuples. Fields
of a
-Tuple can be accessed directly using the fields `tuple.f4`, or using the generic getter method
-`tuple.getField(int position)`. The field numbering starts with 0. Note that this stands
in contrast
-to the Scala tuples, but it is more consistent with Java's general indexing.
-
-{% highlight java %}
-DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
-    new Tuple2<String, Integer>("hello", 1),
-    new Tuple2<String, Integer>("world", 2));
-
-wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
-    @Override
-    public String map(Tuple2<String, Integer> value) throws Exception {
-        return value.f1;
-    }
-});
-{% endhighlight %}
-
-When working with operators that require a Key for grouping or matching records,
-Tuples let you simply specify the positions of the fields to be used as key. You can specify
more
-than one position to use composite keys (see [Section Data Transformations](#transformations)).
-
 {% highlight java %}
 wordCounts
-    .groupBy(0) // also valid .groupBy("f0")
+    .groupBy("word")                    // group by field expression "word"
     .reduce(new MyReduceFunction());
 {% endhighlight %}
-Also, you can "navigate" into nested tuples using (String) key expressions.
-
-In order to access fields more intuitively and to generate more readable code, it is also
possible
-to extend a subclass of Tuple. You can add getters and setters with custom names that delegate
to
-the field positions. See this
-{% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
"example" %} for an
-illustration how to make use of that mechanism.
-
-Note that if you are extending from a Tuple and add fields to your class, it will be treated
as a POJO.
-
 </div>
 <div data-lang="scala" markdown="1">
+{% highlight scala %}
+wordCounts groupBy { _.word } reduce(new MyReduceFunction())
+{% endhighlight %}
+</div>
+</div>
 
-Flink has special support for Scala's Case Classes and Tuples. When using working with an
operator
-that required a key for grouping or matching records this key can be specified using tuple
field
-positions or field names: 
+#### Primitive Types
 
-{% highlight scala %}
-case class WordCount(word: String, count: Int)
-val input = env.fromElements(
-    WordCount("hello", 1),
-    WordCount("world", 2))
+Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.

 
-input.groupBy("word").reduce(...)
+#### General Class Types
 
-val input2 = env.fromElements(("hello", 1), ("world", 2))
+Flink supports most Java and Scala classes (API and custom). 
+Restrictions apply to classes containing fields that cannot be serialized, like file pointers,
I/O streams, or other native
+resources. Classes that follow the Java Beans conventions work well in general.
 
-input2.groupBy(0, 1).reduce(...)
-{% endhighlight %}
+All classes that are not identified as POJO types (see POJO requirements above) are handled
by Flink as general class types. 
+Flink treats these data types as black boxes and is not able to access their their content
(i.e., for efficient sorting). General types are de/serialized using the serialization framework
[Kryo](https://github.com/EsotericSoftware/kryo). 
 
-Both variants allow specifying more than one key field name or key field position. See
-[specifying keys](#specifying-keys) for more details.
+When grouping, sorting, or joining a data set of generic types, keys must be specified with
key selector functions. See the [key definition section](#specifying-keys) or [data transformation
section](#transformations) for details.
 
-</div>
-</div>
 
 #### Values
 
@@ -2257,36 +2148,81 @@ env.execute()
 Semantic Annotations
 -----------
 
-Semantic Annotations give hints about the behavior of a function by telling the system which
fields
-in the input are accessed and which are constant between input and output data of a function
(copied
-but not modified). Semantic annotations are a powerful means to speed up execution, because
they
+Semantic annotations can be used give Flink hints about the behavior of a function. 
+They tell the system which fields of a function's input the function reads and evaluates
and
+which fields it unmodified forwards from its input to its output. 
+Semantic annotations are a powerful means to speed up execution, because they
 allow the system to reason about reusing sort orders or partitions across multiple operations.
Using
 semantic annotations may eventually save the program from unnecessary data shuffling or unnecessary
-sorts.
+sorts and significantly improve the performance of a program.
+
+**Note:** The use of semantic annotations is optional. However, it is absolutely crucial
to 
+be conservative when providing semantic annotations! 
+Incorrect semantic annotations will cause Flink to make incorrect assumptions about your
program and 
+might eventually lead to incorrect results. 
+If the behavior of an operator is not clearly predictable, no annotation should be provided.
+Please read the documentation carefully.
+
+The following semantic annotations are currently supported.
+
+#### Forwarded Fields Annotation
+
+Forwarded fields information declares input fields which are unmodified forwarded by a function
to the same position or to another position in the output. 
+This information is used by the optimizer to infer whether a data property such as sorting
or 
+partitioning is preserved by a function.
+
+Field forward information is specified using [field expressions](#define-keys-using-field-expressions).
+Fields that are forwarded to the same position in the output can be specified by their position.

+The specified position must be valid for the input and output data type and have the same
type.
+For example the String `"f2"` declares that the third field of a Java input tuple is always
equal to the third field in the output tuple.
+
+Fields which are unmodified forwarded to another position in the output are declared by specifying
the
+source field in the input and the target field in the output as field expressions.
+The String `"f0->f2"` denotes that the first field of the Java input tuple is
+unchanged copied to the third field of the Java output tuple. The wildcard expression `*`
can be used to refer to a whole input or output type, i.e., `"f0->*"` denotes that the
output of a function is always equal to the first field of its Java input tuple.
+
+Multiple forwarded fields can be declared in a single String by separating them with semicolons
as `"f0; f2->f1; f3->f2"` or in separate Strings `"f0", "f2->f1", "f3->f2"`. When
specifying forwarded fields it is not required that all forwarded fields are declared, but
all declarations must be correct.
+
+Forwarded field information can be declared by attaching Java annotations on function class
definitions or 
+by passing them as operator arguments after invoking a function on a DataSet as shown below.
 
-Semantic annotations can be attached to functions through Annotations, or passed as arguments
-when invoking a function on a DataSet. The following example illustrates that:
+##### Function Class Annotations
+
+* `@ForwardedFields` for single input functions such as Map and Reduce.
+* `@ForwardedFieldsFirst` for the first input of a functions with two inputs such as Join
and CoGroup.
+* `@ForwardedFieldsSecond` for the second input of a functions with two inputs such as Join
and CoGroup.
+
+##### Operator Arguments
+
+* `data.map(myMapFnc).withForwardedFields()` for single input function such as Map and Reduce.
+* `data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst()` for the
first input of a function with two inputs such as Join and CoGroup. 
+* `data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond()` for the
second input of a function with two inputs such as Join and CoGroup.
+
+Please note that it is not possible to overwrite field forward information which was specified
as a class annotation by operator arguments. 
+
+##### Example
+
+The following example shows how to declare forwarded field information using a function class
annotation:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-@ConstantFields("1")
-public class DivideFirstbyTwo implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>> {
+@ForwardedFields("f0->f2")
+public class MyMap implements 
+              MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>>
{
   @Override
-  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
-    value.f0 /= 2;
-    return value;
+  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
+    return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
   }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-@ConstantFields("1")
-class DivideFirstbyTwo extends MapFunction[(Int, Int), (Int, Int)]{
-   def map(input: (Int, Int): (Int, Int) = {
-    value.f0 /= 2;
-    (input._1 / 2, input._2)
+@ForwardedFields("_1->_3")
+class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
+   def map(value: (Int, Int)): (String, Int, Int) = {
+    return ("foo", value._2 / 2, value._1)
   }
 }
 {% endhighlight %}
@@ -2294,33 +2230,113 @@ class DivideFirstbyTwo extends MapFunction[(Int, Int), (Int, Int)]{
 </div>
 </div>
 
-The following annotations are currently available:
+#### Non-Forwarded Fields
 
-* `@ConstantFields`: Declares constant fields (forwarded/copied) for functions with a single
input
-  data set (Map, Reduce, Filter, ...).
+Non-forwarded fields information declares all fields which are not preserved on the same
position in a function's output. 
+The values of all other fields are considered to be preserved at the same position in the
output. 
+Hence, non-forwarded fields information is inverse to forwarded fields information.
 
-* `@ConstantFieldsFirst`: Declares constant fields (forwarded/copied) for functions with
a two input
-  data sets (Join, CoGroup, ...), with respect to the first input data set.
+**IMPORTANT**: The specification of non-forwarded fields information is optional. However
if used, 
+**ALL!** non-forwarded fields must be specified, because all other fields are considered
to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.
 
-* `@ConstantFieldsSecond`: Declares constant fields (forwarded/copied) for functions with
a two
-  input data sets (Join, CoGroup, ...), with respect to the first second data set.
+Non-forwarded fields are specified as a list of [field expressions](#define-keys-using-field-expressions).
The list can be either given as a single String with field expressions separated by semicolons
or as multiple Strings. 
+For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of
a Java tuple 
+are not preserved in place and all other fields are preserved in place. 
+Non-forwarded field information can only be specified for functions which have identical
input and output types.
 
-* `@ConstantFieldsExcept`: Declares that all fields are constant, except for the specified
fields.
-  Applicable to functions with a single input data set.
+Non-forwarded field information is specified as function class annotations using the following
annotations:
 
-* `@ConstantFieldsFirstExcept`: Declares that all fields of the first input are constant,
except for
-  the specified fields. Applicable to functions with a two input data sets.
+* `@NonForwardedFields` for single input functions such as Map and Reduce.
+* `@NonForwardedFieldsFirst` for the first input of a function with two inputs such as Join
and CoGroup.
+* `@NonForwardedFieldsSecond` for the second input of a function with two inputs such as
Join and CoGroup.
 
-* `@ConstantFieldsSecondExcept`: Declares that all fields of the second input are constant,
except
-  for the specified fields. Applicable to functions with a two input data sets.
+##### Example
 
-*(Note: The system currently only evaluates annotations only Tuple DataSets.  This will be
extended
-in the next versions)*
+The following example shows how to declare non-forwarded field information:
 
-**Note**: It is important to be conservative when providing annotations. Only annotate fields,
-when they are always constant for every call to the function. Otherwise the system has incorrect
-assumptions about the execution and the execution may produce wrong results. If the behavior
of the
-operator is not clearly predictable, no annotation should be provided.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@NonForwardedFields("f1") // second field is not forwarded
+public class MyMap implements 
+              MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
{
+  @Override
+  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
+    return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
+  }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@NonForwardedFields("_2") // second field is not forwarded
+class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
+  def map(value: (Int, Int)): (Int, Int) = {
+    return (value._1, value._2 / 2)
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+#### Read Fields
+
+Read fields information declares all fields that are accessed and evaluated by a function,
i.e.,
+all fields that are used by the function to compute its result.
+For example, fields which are evaluated in conditional statements or used for computations
must be marked as read when specifying read fields information.
+Fields which are only unmodified forwarded to the output without evaluating their values
or fields which are not accessed at all are not considered to be read.
+
+**IMPORTANT**: The specification of read fields information is optional. However if used,

+**ALL!** read fields must be specified. It is safe to declare a non-read field as read.
+
+Read fields are specified as a list of [field expressions](#define-keys-using-field-expressions).
The list can be either given as a single String with field expressions separated by semicolons
or as multiple Strings. 
+For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of
a Java tuple are read and evaluated by the function.
+
+Read field information is specified as function class annotations using the following annotations:
+
+* `@ReadFields` for single input functions such as Map and Reduce.
+* `@ReadFieldsFirst` for the first input of a function with two inputs such as Join and CoGroup.
+* `@ReadFieldsSecond` for the second input of a function with two inputs such as Join and
CoGroup.
+
+##### Example
+
+The following example shows how to declare read field information:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function. 
+public class MyMap implements 
+              MapFunction<Tuple4<Integer, Integer, Integer, Integer>, 
+                          Tuple2<Integer, Integer>> {
+  @Override
+  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer>
val) {
+    if(val.f0 == 42) {
+      return new Tuple2<Integer, Integer>(val.f0, val.f1);
+    } else {
+      return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
+    }
+  }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
+class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
+   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
+    if (value._1 == 42) {
+      return (value._1, value._2)
+    } else {
+      return (value._4 + 10, value._2)
+    }
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
 
 [Back to top](#top)
 


Mime
View raw message