flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [10/11] git commit: Documentation and fixes
Date Wed, 08 Oct 2014 09:40:30 GMT
Documentation and fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/598ae376
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/598ae376
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/598ae376

Branch: refs/heads/master
Commit: 598ae3765cc45c2ede00fd4e32adf4a4a3327615
Parents: fd0be2f
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Oct 7 15:44:38 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Oct 8 11:39:01 2014 +0200

----------------------------------------------------------------------
 docs/dataset_transformations.md                 |  56 +--
 docs/programming_guide.md                       |  92 +++-
 .../api/java/typeutils/GenericTypeInfo.java     |  10 +
 .../flink/api/java/typeutils/TypeExtractor.java |   3 +-
 .../javaApiOperators/GroupReduceITCase.java     |  26 +-
 .../util/CollectionDataSets.java                | 448 ++++++++++---------
 6 files changed, 372 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/598ae376/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index f968b1d..78a3fd5 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -176,11 +176,12 @@ DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integ
 ### Transformations on Grouped DataSet
 
 The reduce operations can operate on grouped data sets. Specifying the key to
-be used for grouping can be done in two ways:
+be used for grouping can be done in many ways:
 
-- a key-selector function or
-- one or more field position keys (Tuple DataSet only).
-- Case Class fields (Case Classes only).
+- key expressions
+- a key-selector function
+- one or more field position keys (Tuple DataSet only)
+- Case Class fields (Case Classes only)
 
 Please look at the reduce examples to see how the grouping keys are specified.
 
@@ -220,11 +221,8 @@ public class WordCounter implements ReduceFunction<WC> {
 // [...]
 DataSet<WC> words = // [...]
 DataSet<WC> wordCounts = words
-                         // DataSet grouping with inline-defined KeySelector function
-                         .groupBy(
-                           new KeySelector<WC, String>() {
-                             public String getKey(WC wc) { return wc.word; }
-                           })
+                         // DataSet grouping on field "word"
+                         .groupBy("word")
                          // apply ReduceFunction on grouped DataSet
                          .reduce(new WordCounter());
 ~~~
@@ -360,11 +358,10 @@ Works analogous to grouping by Case Class fields in *Reduce* transformations.
 
 Works analogous to key-selector functions in *Reduce* transformations.
 
-#### GroupReduce on sorted groups (Tuple DataSets only)
+#### GroupReduce on sorted groups
 
 A group-reduce function accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined
 group-reduce function and improve its efficiency.
-Right now, this feature is only available for DataSets of Tuples.
 
 The following code shows another example how to remove duplicate Strings in a DataSet grouped by an Integer and sorted by String.
 
@@ -656,7 +653,8 @@ val output = input.aggregate(SUM, 0).and(MIN, 2)
 
 The Join transformation joins two DataSets into one DataSet. The elements of both DataSets are joined on one or more keys which can be specified using
 
-- a key-selector function or
+- a kex expression
+- a key-selector function
 - one or more field position keys (Tuple DataSet only).
 - Case Class Fields
 
@@ -672,13 +670,15 @@ The following code shows a default Join transformation using field position keys
 <div data-lang="java" markdown="1">
 
 ~~~java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Double, Integer>> input2 = // [...]
+public static class User { public String name; public int zip; }
+public static class Store { public Manager mgr; public int zip; }
+DataSet<User> input1 = // [...]
+DataSet<Store> input2 = // [...]
 // result dataset is typed as Tuple2
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
+DataSet<Tuple2<User, Store>>
             result = input1.join(input2)
-                           .where(0)       // key of the first input
-                           .equalTo(1);    // key of the second input
+                           .where("zip")       // key of the first input (users)
+                           .equalTo("zip");    // key of the second input (stores)
 ~~~
 
 </div>
@@ -729,14 +729,10 @@ DataSet<Tuple2<String, Double>>
             ratings.join(weights)
 
                    // key of the first input
-                   .where(new KeySelection<Rating, String>() {
-                            public String getKey(Rating r) { return r.category; }
-                          })
+                   .where("category)
 
                    // key of the second input
-                   .equalTo(new KeySelection<Tuple2<String, Double>, String>() {
-                              public String getKey(Tuple2<String, Double> t) { return t.f0; }
-                            })
+                   .equalTo("f0")
 
                    // applying the JoinFunction on joining pairs
                    .with(new PointWeighter());
@@ -1000,17 +996,14 @@ val result1 = input1.crossWithHuge(input2)
 The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped on a defined key and groups of both DataSets that share the same key are handed together to a user-defined co-group function. If for a specific key only one DataSet has a group, the co-group function is called with this group and an empty group.
 A co-group function can separately iterate over the elements of both groups and return an arbitrary number of result elements.
 
-Similar to Reduce, GroupReduce, and Join, keys can be defined using
+Similar to Reduce, GroupReduce, and Join, keys can be defined using the different key-selection methods.
 
-- a key-selector function or
-- one or more field position keys (Tuple DataSet only) or
-- Case Class fields.
-
-#### CoGroup on DataSets Grouped by Field Position Keys (Tuple DataSets only)
+#### CoGroup on DataSets
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
+The example shows how to group by Field Position Keys (Tuple DataSets only). You can do the same with Pojo-types and key expressions.
 ~~~java
 // Some CoGroupFunction definition
 class MyCoGrouper
@@ -1071,9 +1064,6 @@ val output = iVals.coGroup(dVals).where(0).equalTo(0) {
 </div>
 </div>
 
-#### CoGroup on DataSets Grouped by Key-Selector Function
-
-Works analogous to key-selector functions in Join transformations.
 
 ### Union
 
@@ -1132,7 +1122,7 @@ val out = in.rebalance().map { ... }
 ### Hash-Partition
 
 Hash-partitions a DataSet on a given key. 
-Keys can be specified as key-selector functions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
+Keys can be specified as key expressions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/598ae376/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 8d0b299..fdea599 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -871,6 +871,8 @@ you do not need to physically pack the data set types into keys and
 values. Keys are "virtual": they are defined as functions over the
 actual data to guide the grouping operator.
 
+### Define keys for Tuples
+
 The simplest case is grouping a data set of Tuples on one or more
 fields of the Tuple:
 {% highlight java %}
@@ -895,7 +897,75 @@ The data set is grouped on the composite key consisting of the first and the
 second fields, therefore the GroupReduceFuntion will receive groups
 with the same value for both fields.
 
-In general, key definition is done via a "key selector" function, which
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+{% highlight java %}
+DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
+{% endhighlight %}
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use a string-based expression, as explained below. For this particular example, you would have to specfiy `f0.f0`.
+
+### Define key using a String Expression
+Starting from release 0.7-incubating, you can use String-based key expressions to select keys.
+
+The String expressions allow to specify the name of the field in a class you want to group by.
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field "word", we just pass this name to the `groupBy()` function.
+{% highlight java %}
+// some ordinary POJO (Plain old Java Object)
+public class WC {
+  public String word; 
+  public int count;
+}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);
+{% endhighlight %}
+
+**Conditions** for a class to be treated as a POJO by Flink:
+
+- The class must be public
+- It must have a public constructor without arguments
+- All fields either have to be public or there must be getters and setters for all non-public fields. If the field name is `foo` the getter and setters must be called `getFoo()` and `setFoo()`.
+
+**Valid Expressions**:
+
+- You can select POJO fields by their field name
+- You can select Tuple fields by their field name as well. For example `f0` or `f5`.
+- You can select nested fields in POJOs and Tuples. Expressions like `user.zip` or `user.groupId` are valid. Flink also supports POJOs inside Tuples: `f1.user.zip`.
+- You can select all fields at each level. To select all fields, specify `*`. This also works for the nested case: `user.*`.
+
+**Example for nested POJO**
+
+{% highlight java %}
+public static class WC {
+  public ComplexNestedClass complex; //nested POJO
+  private int count;
+  // getter / setter for private field (count)
+  public int getCount() {
+    return count;
+  }
+  public void setCount(int c) {
+    this.count = c;
+  }
+}
+public static class ComplexNestedClass {
+  public Integer someNumber;
+  public float someFloat;
+  public Tuple3<Long, Long, String> word;
+  public IntWritable hadoopCitizen;
+}
+{% endhighlight %}
+
+These are valid expressions for the example POJO above:
+
+- `count`: The count field in the `WC` class.
+- `complex.*`: Selects all fields in the `ComplexNestedClass`.
+- `complex.word.f2`: Selects the last field in the Tuple3.
+- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
+
+Please note that you can only use types inside POJOs that Flink is able to serialize. Currently, we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
+
+### Define key using a Key Selector Function
+
+An additional way to define keys are "key selector" functions, which
 takes as argument one dataset element and returns a key of an
 arbitrary data type by performing an arbitrary computation on this
 element. For example:
@@ -1053,8 +1123,7 @@ data.map(new MapFunction<String, Integer> () {
 
 #### Java 8 Lambdas
 
-***Warning: Lambdas are currently only supported for filter and reduce
-   transformations***
+Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide](java8_programming_guide.html).
 
 {% highlight java %}
 DataSet<String> data = // [...]
@@ -1179,7 +1248,7 @@ efficient execution strategies.
 There are four different categories of data types, which are treated slightly different when it
 to [specifying keys](#specifying-keys):
 
-1. **General Types**
+1. **General Types and POJOs**
 2. **Tuples**/**Case Classes**
 3. **Values**
 4. **Hadoop Writables**
@@ -1276,17 +1345,13 @@ wordCounts.map { _.count }
 
 
 When working with operators that require a Key for grouping or matching records
-you need to implement a key selector function for your custom type (see
+you can select the key using a key expression (see
 [Specifying Keys](#specifying-keys)).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-wordCounts.groupBy(new KeySelector<WordCount, String>() {
-    public String getKey(WordCount v) {
-        return v.word;
-    }
-}).reduce(new MyReduceFunction());
+wordCounts.groupBy("word").reduce(new MyReduceFunction());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -1328,16 +1393,19 @@ than one position to use composite keys (see [Section Data Transformations](#tra
 
 {% highlight java %}
 wordCounts
-    .groupBy(0)
+    .groupBy(0) // also valid .groupBy("f0")
     .reduce(new MyReduceFunction());
 {% endhighlight %}
+Also, you can "navigate" into nested tuples using (String) key expressions.
 
 In order to access fields more intuitively and to generate more readable code, it is also possible
 to extend a subclass of Tuple. You can add getters and setters with custom names that delegate to
 the field positions. See this
-{% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java "example" %} for an
+{% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java "example" %} for an
 illustration how to make use of that mechanism.
 
+Note that if you are extending from a Tuple and add fields to your class, it will be treated as a POJO.
+
 </div>
 <div data-lang="scala" markdown="1">
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/598ae376/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 6272538..7b0c5bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 
+import java.util.Collection;
+
 
 /**
  *
@@ -32,9 +34,17 @@ import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
 	private final Class<T> typeClass;
+	private final static Class[] unsupportedByAvro = new Class[] {Collection.class};
 	
 	public GenericTypeInfo(Class<T> typeClass) {
 		this.typeClass = typeClass;
+		for(Class unsupported: unsupportedByAvro) {
+			if(unsupported.isAssignableFrom(typeClass)) {
+				throw new RuntimeException("The type '"+typeClass+"' is currently not supported " +
+						"by the Avro Serializer that Flink is using for serializing " +
+						"arbitrary objects");
+			}
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/598ae376/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 5935e31..7836e74 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1065,7 +1065,8 @@ public class TypeExtractor {
 
 	/**
 	 * recursively determine all declared fields
-	 * This is required because getFields() is not returning 
+	 * This is required because class.getFields() is not returning fields defined
+	 * in parent classes.
 	 */
 	public static List<Field> getAllDeclaredFields(Class<?> clazz) {
 		List<Field> result = new ArrayList<Field>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/598ae376/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 63c4dfc..cb79688 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -636,9 +636,9 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 					env.execute();
 
 					// return expected result
-					return "a--(1,1)-(1,2)-(1,3)-\n" +
+					return "a--(2,1)-(1,3)-(1,2)-\n" +
 							"b--(2,2)-\n"+
-							"c--(3,3)-(3,6)-(3,9)-\n";
+							"c--(4,9)-(3,6)-(3,3)-\n";
 				}
 				case 22: {
 					/*
@@ -737,24 +737,17 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 					env.setDegreeOfParallelism(1);
 
-					DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getPojoWithMultiplePojos(env);
+					DataSet<CollectionDataSets.PojoWithMultiplePojos> ds = CollectionDataSets.getPojoWithMultiplePojos(env);
 					// f0.f0 is first integer
-					DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
-							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String>() {
+					DataSet<String> reduceDs = ds.groupBy("p2.a2")
+							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithMultiplePojos, String>() {
 								@Override
 								public void reduce(
-										Iterable<PojoContainingTupleAndWritable> values,
+										Iterable<CollectionDataSets.PojoWithMultiplePojos> values,
 										Collector<String> out) throws Exception {
-									boolean once = false;
 									StringBuilder concat = new StringBuilder();
-									for(PojoContainingTupleAndWritable value : values) {
-										if(!once) {
-											concat.append(value.hadoopFan.get());
-											concat.append("---");
-											once = true;
-										}
-										concat.append(value.theTuple);
-										concat.append("-");
+									for(CollectionDataSets.PojoWithMultiplePojos value : values) {
+										concat.append(value.p2.a2);
 									}
 									out.collect(concat.toString());
 								}
@@ -763,8 +756,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 					env.execute();
 
 					// return expected result
-					return "1---(10,100)-\n" +
-							"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
+					return "b\nccc\nee\n";
 				}
 				
 				default: {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/598ae376/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index e5ab29a..6757f66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.javaApiOperators.util;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -44,139 +45,139 @@ import org.apache.hadoop.io.IntWritable;
 public class CollectionDataSets {
 
 	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
-		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
-		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
-		data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
-		data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
-		data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
-		data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
-		data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
-		data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
-		data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
-		data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
-		data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
-		data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
-		data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
-		data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
-		data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
-		data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
-		data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
-		data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
-		data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
-		data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
-		
+		data.add(new Tuple3<Integer, Long, String>(1, 1l, "Hi"));
+		data.add(new Tuple3<Integer, Long, String>(2, 2l, "Hello"));
+		data.add(new Tuple3<Integer, Long, String>(3, 2l, "Hello world"));
+		data.add(new Tuple3<Integer, Long, String>(4, 3l, "Hello world, how are you?"));
+		data.add(new Tuple3<Integer, Long, String>(5, 3l, "I am fine."));
+		data.add(new Tuple3<Integer, Long, String>(6, 3l, "Luke Skywalker"));
+		data.add(new Tuple3<Integer, Long, String>(7, 4l, "Comment#1"));
+		data.add(new Tuple3<Integer, Long, String>(8, 4l, "Comment#2"));
+		data.add(new Tuple3<Integer, Long, String>(9, 4l, "Comment#3"));
+		data.add(new Tuple3<Integer, Long, String>(10, 4l, "Comment#4"));
+		data.add(new Tuple3<Integer, Long, String>(11, 5l, "Comment#5"));
+		data.add(new Tuple3<Integer, Long, String>(12, 5l, "Comment#6"));
+		data.add(new Tuple3<Integer, Long, String>(13, 5l, "Comment#7"));
+		data.add(new Tuple3<Integer, Long, String>(14, 5l, "Comment#8"));
+		data.add(new Tuple3<Integer, Long, String>(15, 5l, "Comment#9"));
+		data.add(new Tuple3<Integer, Long, String>(16, 6l, "Comment#10"));
+		data.add(new Tuple3<Integer, Long, String>(17, 6l, "Comment#11"));
+		data.add(new Tuple3<Integer, Long, String>(18, 6l, "Comment#12"));
+		data.add(new Tuple3<Integer, Long, String>(19, 6l, "Comment#13"));
+		data.add(new Tuple3<Integer, Long, String>(20, 6l, "Comment#14"));
+		data.add(new Tuple3<Integer, Long, String>(21, 6l, "Comment#15"));
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
 	}
-	
+
 	public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
-		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
-		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
-		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
-		
+		data.add(new Tuple3<Integer, Long, String>(1, 1l, "Hi"));
+		data.add(new Tuple3<Integer, Long, String>(2, 2l, "Hello"));
+		data.add(new Tuple3<Integer, Long, String>(3, 2l, "Hello world"));
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
 	}
-	
+
 	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
-		
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(1, 1l, 0, "Hallo", 1l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(2, 2l, 1, "Hallo Welt", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(2, 3l, 2, "Hallo Welt wie", 1l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(3, 4l, 3, "Hallo Welt wie gehts?", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(3, 5l, 4, "ABC", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(3, 6l, 5, "BCD", 3l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(4, 7l, 6, "CDE", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(4, 8l, 7, "DEF", 1l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(4, 9l, 8, "EFG", 1l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(4, 10l, 9, "FGH", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(5, 11l, 10, "GHI", 1l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(5, 12l, 11, "HIJ", 3l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(5, 13l, 12, "IJK", 3l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(5, 14l, 13, "JKL", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(5, 15l, 14, "KLM", 2l));
+
 		Collections.shuffle(data);
-		
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new 
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO
-				);
-		
+
+		TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
+				TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
 		return env.fromCollection(data, type);
 	}
-	
+
 	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
-		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
-		
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(1, 1l, 0, "Hallo", 1l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(2, 2l, 1, "Hallo Welt", 2l));
+		data.add(new Tuple5<Integer, Long, Integer, String, Long>(2, 3l, 2, "Hallo Welt wie", 1l));
+
 		Collections.shuffle(data);
-		
-		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new 
-				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO
-				);
-		
+
+		TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
+				TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
 		return env.fromCollection(data, type);
 	}
-	
+
 	public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>();
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,1), "one"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,2), "two"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,3), "three"));
-		
-		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new 
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(1, 1), "one"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(2, 2), "two"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(3, 3), "three"));
+
+		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new
 				TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>>(
-						new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO),
-						BasicTypeInfo.STRING_TYPE_INFO
-				);
-		
+				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO
+		);
+
 		return env.fromCollection(data, type);
 	}
-	
+
 	public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>();
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,3), "a"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,2), "a"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,1), "a"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,2), "b"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,3), "c"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,6), "c"));
-		data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(4,9), "c"));
-		
-		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new 
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(1, 3), "a"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(1, 2), "a"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(2, 1), "a"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(2, 2), "b"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(3, 3), "c"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(3, 6), "c"));
+		data.add(new Tuple2<Tuple2<Integer, Integer>, String>(new Tuple2<Integer, Integer>(4, 9), "c"));
+
+		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new
 				TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>>(
-						new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO),
-						BasicTypeInfo.STRING_TYPE_INFO
-				);
-		
+				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO
+		);
+
 		return env.fromCollection(data, type);
 	}
-	
+
 	public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {
-		
+
 		List<String> data = new ArrayList<String>();
 		data.add("Hi");
 		data.add("Hello");
@@ -186,14 +187,14 @@ public class CollectionDataSets {
 		data.add("Luke Skywalker");
 		data.add("Random comment");
 		data.add("LOL");
-		
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
 	}
-	
+
 	public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env) {
-		
+
 		List<Integer> data = new ArrayList<Integer>();
 		data.add(1);
 		data.add(2);
@@ -210,135 +211,146 @@ public class CollectionDataSets {
 		data.add(5);
 		data.add(5);
 		data.add(5);
-		
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
 	}
-	
+
 	public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
-		
+
 		List<CustomType> data = new ArrayList<CustomType>();
-		data.add(new CustomType(1,0l,"Hi"));
-		data.add(new CustomType(2,1l,"Hello"));
-		data.add(new CustomType(2,2l,"Hello world"));
-		data.add(new CustomType(3,3l,"Hello world, how are you?"));
-		data.add(new CustomType(3,4l,"I am fine."));
-		data.add(new CustomType(3,5l,"Luke Skywalker"));
-		data.add(new CustomType(4,6l,"Comment#1"));
-		data.add(new CustomType(4,7l,"Comment#2"));
-		data.add(new CustomType(4,8l,"Comment#3"));
-		data.add(new CustomType(4,9l,"Comment#4"));
-		data.add(new CustomType(5,10l,"Comment#5"));
-		data.add(new CustomType(5,11l,"Comment#6"));
-		data.add(new CustomType(5,12l,"Comment#7"));
-		data.add(new CustomType(5,13l,"Comment#8"));
-		data.add(new CustomType(5,14l,"Comment#9"));
-		data.add(new CustomType(6,15l,"Comment#10"));
-		data.add(new CustomType(6,16l,"Comment#11"));
-		data.add(new CustomType(6,17l,"Comment#12"));
-		data.add(new CustomType(6,18l,"Comment#13"));
-		data.add(new CustomType(6,19l,"Comment#14"));
-		data.add(new CustomType(6,20l,"Comment#15"));
-		
+		data.add(new CustomType(1, 0l, "Hi"));
+		data.add(new CustomType(2, 1l, "Hello"));
+		data.add(new CustomType(2, 2l, "Hello world"));
+		data.add(new CustomType(3, 3l, "Hello world, how are you?"));
+		data.add(new CustomType(3, 4l, "I am fine."));
+		data.add(new CustomType(3, 5l, "Luke Skywalker"));
+		data.add(new CustomType(4, 6l, "Comment#1"));
+		data.add(new CustomType(4, 7l, "Comment#2"));
+		data.add(new CustomType(4, 8l, "Comment#3"));
+		data.add(new CustomType(4, 9l, "Comment#4"));
+		data.add(new CustomType(5, 10l, "Comment#5"));
+		data.add(new CustomType(5, 11l, "Comment#6"));
+		data.add(new CustomType(5, 12l, "Comment#7"));
+		data.add(new CustomType(5, 13l, "Comment#8"));
+		data.add(new CustomType(5, 14l, "Comment#9"));
+		data.add(new CustomType(6, 15l, "Comment#10"));
+		data.add(new CustomType(6, 16l, "Comment#11"));
+		data.add(new CustomType(6, 17l, "Comment#12"));
+		data.add(new CustomType(6, 18l, "Comment#13"));
+		data.add(new CustomType(6, 19l, "Comment#14"));
+		data.add(new CustomType(6, 20l, "Comment#15"));
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
-		
+
 	}
-	
+
 	public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
-		
+
 		List<CustomType> data = new ArrayList<CustomType>();
-		data.add(new CustomType(1,0l,"Hi"));
-		data.add(new CustomType(2,1l,"Hello"));
-		data.add(new CustomType(2,2l,"Hello world"));
-		
+		data.add(new CustomType(1, 0l, "Hi"));
+		data.add(new CustomType(2, 1l, "Hello"));
+		data.add(new CustomType(2, 2l, "Hello world"));
+
 		Collections.shuffle(data);
-		
+
 		return env.fromCollection(data);
-		
+
 	}
-	
+
 	public static class CustomType implements Serializable {
-		
+
 		private static final long serialVersionUID = 1L;
-		
+
 		public int myInt;
 		public long myLong;
 		public String myString;
-		
-		public CustomType() {};
-		
+
+		public CustomType() {
+		}
+
+		;
+
 		public CustomType(int i, long l, String s) {
 			myInt = i;
 			myLong = l;
 			myString = s;
 		}
-		
+
 		@Override
 		public String toString() {
-			return myInt+","+myLong+","+myString;
+			return myInt + "," + myLong + "," + myString;
 		}
 	}
-	
+
 	public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedPojoMatchingDataSet(ExecutionEnvironment env) {
 		List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>();
-		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First",10, 100, 1000L, "One", 10000L));
-		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(2, "Second",20, 200, 2000L, "Two", 20000L));
-		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(3, "Third",30, 300, 3000L, "Three", 30000L));
+		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(3, "Third", 30, 300, 3000L, "Three", 30000L));
 		return env.fromCollection(data);
 	}
-	
+
 	public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
 		List<POJO> data = new ArrayList<POJO>();
-		data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L));
-		data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
 		return env.fromCollection(data);
 	}
-	
+
 	public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
 		List<POJO> data = new ArrayList<POJO>();
-		data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); // 5x
-		data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L));
-		data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L)); // 2x
-		data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
 		return env.fromCollection(data);
 	}
-	
+
 	public static class POJO {
 		public int number;
 		public String str;
 		public Tuple2<Integer, CustomType> nestedTupleWithCustom;
 		public NestedPojo nestedPojo;
+		public Date date;
 		public transient Long ignoreMe;
-		public POJO(int i0, String s0, 
-						int i1, int i2, long l0, String s1,
-						long l1) {
+
+		public POJO(int i0, String s0,
+					int i1, int i2, long l0, String s1,
+					long l1) {
 			this.number = i0;
 			this.str = s0;
 			this.nestedTupleWithCustom = new Tuple2<Integer, CustomType>(i1, new CustomType(i2, l0, s1));
 			this.nestedPojo = new NestedPojo();
+			this.date = new Date();
 			this.nestedPojo.longNumber = l1;
 		}
-		public POJO() {}
+
+		public POJO() {
+		}
+
 		@Override
 		public String toString() {
-			return number+" "+str+" "+nestedTupleWithCustom+" "+nestedPojo.longNumber;
+			return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
 		}
 	}
-	
+
 	public static class NestedPojo {
 		public static Object ignoreMe;
 		public long longNumber;
-		public NestedPojo() {}
+
+		public NestedPojo() {
+		}
 	}
-	
+
 	public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
 		List<CrazyNested> data = new ArrayList<CrazyNested>();
 		data.add(new CrazyNested("aa"));
@@ -349,16 +361,20 @@ public class CollectionDataSets {
 		data.add(new CrazyNested("cc"));
 		return env.fromCollection(data);
 	}
-	
+
 	public static class CrazyNested {
 		public CrazyNestedL1 nest_Lvl1;
 		public Long something; // test proper null-value handling
-		public CrazyNested() {}
+
+		public CrazyNested() {
+		}
+
 		public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
 			this(set);
 			something = s;
 			nest_Lvl1.a = second;
 		}
+
 		public CrazyNested(String set) {
 			nest_Lvl1 = new CrazyNestedL1();
 			nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
@@ -367,34 +383,41 @@ public class CollectionDataSets {
 			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set;
 		}
 	}
+
 	public static class CrazyNestedL1 {
 		public String a;
 		public int b;
 		public CrazyNestedL2 nest_Lvl2;
 	}
+
 	public static class CrazyNestedL2 {
 		public CrazyNestedL3 nest_Lvl3;
 	}
+
 	public static class CrazyNestedL3 {
 		public CrazyNestedL4 nest_Lvl4;
 	}
+
 	public static class CrazyNestedL4 {
 		public String f1nal;
 	}
-	
+
 	// Copied from TypeExtractorTest
 	public static class FromTuple extends Tuple3<String, String, Long> {
 		private static final long serialVersionUID = 1L;
 		public int special;
 	}
-	
+
 	public static class FromTupleWithCTor extends FromTuple {
-		public FromTupleWithCTor() {}
-		public FromTupleWithCTor(int special, long tupleField ) {
+		public FromTupleWithCTor() {
+		}
+
+		public FromTupleWithCTor(int special, long tupleField) {
 			this.special = special;
 			this.setField(tupleField, 2);
 		}
 	}
+
 	public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
 		List<FromTupleWithCTor> data = new ArrayList<FromTupleWithCTor>();
 		data.add(new FromTupleWithCTor(1, 10L)); // 3x
@@ -404,20 +427,23 @@ public class CollectionDataSets {
 		data.add(new FromTupleWithCTor(2, 20L));
 		return env.fromCollection(data);
 	}
-	
+
 	public static class PojoContainingTupleAndWritable {
 		public int someInt;
 		public String someString;
 		public IntWritable hadoopFan;
 		public Tuple2<Long, Long> theTuple;
-		public PojoContainingTupleAndWritable() {}
+
+		public PojoContainingTupleAndWritable() {
+		}
+
 		public PojoContainingTupleAndWritable(int i, long l1, long l2) {
 			hadoopFan = new IntWritable(i);
 			someInt = i;
 			theTuple = new Tuple2<Long, Long>(l1, l2);
 		}
 	}
-	
+
 	public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
 		List<PojoContainingTupleAndWritable> data = new ArrayList<PojoContainingTupleAndWritable>();
 		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
@@ -428,14 +454,27 @@ public class CollectionDataSets {
 		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
 		return env.fromCollection(data);
 	}
-	
-	public static DataSet<Tuple3<Integer,CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
-		List<Tuple3<Integer,CrazyNested, POJO>> data = new ArrayList<Tuple3<Integer,CrazyNested, POJO>>();
-		data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); // 3x
-		data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) ));
-		data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) ));
+
+
+
+	public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<PojoContainingTupleAndWritable>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<Integer, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
+		List<Tuple3<Integer, CrazyNested, POJO>> data = new ArrayList<Tuple3<Integer, CrazyNested, POJO>>();
+		data.add(new Tuple3<Integer, CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
+		data.add(new Tuple3<Integer, CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		data.add(new Tuple3<Integer, CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
 		// POJO is not initialized according to the first two fields.
-		data.add(new Tuple3<Integer,CrazyNested, POJO>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); // 1x
+		data.add(new Tuple3<Integer, CrazyNested, POJO>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
 		return env.fromCollection(data);
 	}
 
@@ -443,32 +482,41 @@ public class CollectionDataSets {
 		public String a;
 		public String b;
 	}
+
 	public static class Pojo2 {
 		public String a2;
 		public String b2;
 	}
+
 	public static class PojoWithMultiplePojos {
 		public Pojo1 p1;
 		public Pojo2 p2;
 		public Integer i0;
-		public PojoWithMultiplePojos() {}
+
+		public PojoWithMultiplePojos() {
+		}
+
 		public PojoWithMultiplePojos(String a, String b, String a1, String b1, Integer i0) {
 			p1 = new Pojo1();
 			p1.a = a;
 			p1.b = b;
 			p2 = new Pojo2();
 			p2.a2 = a1;
-			p2.a2 = b1;
+			p2.b2 = b1;
 			this.i0 = i0;
 		}
 	}
-	
+
 	public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
 		List<PojoWithMultiplePojos> data = new ArrayList<PojoWithMultiplePojos>();
-		data.add(new PojoWithMultiplePojos("a","aa","b","bb", 1));
-		data.add(new PojoWithMultiplePojos("b","bb","c","cc", 2));
-		data.add(new PojoWithMultiplePojos("d","dd","e","ee", 3));
+		data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
 		return env.fromCollection(data);
 	}
-	
+
 }
+


Mime
View raw message