flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [05/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests
Date Wed, 12 Jul 2017 23:44:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/operators/util/CollectionDataSets.java
new file mode 100644
index 0000000..e2e22a8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/util/CollectionDataSets.java
@@ -0,0 +1,772 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators.util;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.File;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.math.BigInt;
+
+/**
+ * #######################################################################################################
+ * 			BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
+ * 			IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
+ * #######################################################################################################
+ */
+public class CollectionDataSets {
+
+	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
+		data.add(new Tuple3<>(5, 3L, "I am fine."));
+		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
+		data.add(new Tuple3<>(7, 4L, "Comment#1"));
+		data.add(new Tuple3<>(8, 4L, "Comment#2"));
+		data.add(new Tuple3<>(9, 4L, "Comment#3"));
+		data.add(new Tuple3<>(10, 4L, "Comment#4"));
+		data.add(new Tuple3<>(11, 5L, "Comment#5"));
+		data.add(new Tuple3<>(12, 5L, "Comment#6"));
+		data.add(new Tuple3<>(13, 5L, "Comment#7"));
+		data.add(new Tuple3<>(14, 5L, "Comment#8"));
+		data.add(new Tuple3<>(15, 5L, "Comment#9"));
+		data.add(new Tuple3<>(16, 6L, "Comment#10"));
+		data.add(new Tuple3<>(17, 6L, "Comment#11"));
+		data.add(new Tuple3<>(18, 6L, "Comment#12"));
+		data.add(new Tuple3<>(19, 6L, "Comment#13"));
+		data.add(new Tuple3<>(20, 6L, "Comment#14"));
+		data.add(new Tuple3<>(21, 6L, "Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new TupleTypeInfo<>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new TupleTypeInfo<>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<>();
+		data.add(new Tuple2<>(new Tuple2<>(1, 1), "one"));
+		data.add(new Tuple2<>(new Tuple2<>(2, 2), "two"));
+		data.add(new Tuple2<>(new Tuple2<>(3, 3), "three"));
+
+		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<>();
+		data.add(new Tuple2<>(new Tuple2<>(1, 3), "a"));
+		data.add(new Tuple2<>(new Tuple2<>(1, 2), "a"));
+		data.add(new Tuple2<>(new Tuple2<>(2, 1), "a"));
+		data.add(new Tuple2<>(new Tuple2<>(2, 2), "b"));
+		data.add(new Tuple2<>(new Tuple2<>(3, 3), "c"));
+		data.add(new Tuple2<>(new Tuple2<>(3, 6), "c"));
+		data.add(new Tuple2<>(new Tuple2<>(4, 9), "c"));
+
+		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+
+		List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = new ArrayList<>();
+		data.add(new Tuple3<>(new Tuple2<>(1, 3), "a", 2));
+		data.add(new Tuple3<>(new Tuple2<>(1, 2), "a", 1));
+		data.add(new Tuple3<>(new Tuple2<>(2, 1), "a", 3));
+		data.add(new Tuple3<>(new Tuple2<>(2, 2), "b", 4));
+		data.add(new Tuple3<>(new Tuple2<>(3, 3), "c", 5));
+		data.add(new Tuple3<>(new Tuple2<>(3, 6), "c", 6));
+		data.add(new Tuple3<>(new Tuple2<>(4, 9), "c", 7));
+
+		TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>> type = new TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
+		List<Tuple2<byte[], Integer>> data = new ArrayList<>();
+		data.add(new Tuple2<>(new byte[]{0, 4}, 1));
+		data.add(new Tuple2<>(new byte[]{2, 0}, 1));
+		data.add(new Tuple2<>(new byte[]{2, 0, 4}, 4));
+		data.add(new Tuple2<>(new byte[]{2, 1}, 3));
+		data.add(new Tuple2<>(new byte[]{0}, 0));
+		data.add(new Tuple2<>(new byte[]{2, 0}, 1));
+
+		TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<>(
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {
+
+		List<String> data = new ArrayList<>();
+		data.add("Hi");
+		data.add("Hello");
+		data.add("Hello world");
+		data.add("Hello world, how are you?");
+		data.add("I am fine.");
+		data.add("Luke Skywalker");
+		data.add("Random comment");
+		data.add("LOL");
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env) {
+
+		List<Integer> data = new ArrayList<>();
+		data.add(1);
+		data.add(2);
+		data.add(2);
+		data.add(3);
+		data.add(3);
+		data.add(3);
+		data.add(4);
+		data.add(4);
+		data.add(4);
+		data.add(4);
+		data.add(5);
+		data.add(5);
+		data.add(5);
+		data.add(5);
+		data.add(5);
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
+
+		List<CustomType> data = new ArrayList<>();
+		data.add(new CustomType(1, 0L, "Hi"));
+		data.add(new CustomType(2, 1L, "Hello"));
+		data.add(new CustomType(2, 2L, "Hello world"));
+		data.add(new CustomType(3, 3L, "Hello world, how are you?"));
+		data.add(new CustomType(3, 4L, "I am fine."));
+		data.add(new CustomType(3, 5L, "Luke Skywalker"));
+		data.add(new CustomType(4, 6L, "Comment#1"));
+		data.add(new CustomType(4, 7L, "Comment#2"));
+		data.add(new CustomType(4, 8L, "Comment#3"));
+		data.add(new CustomType(4, 9L, "Comment#4"));
+		data.add(new CustomType(5, 10L, "Comment#5"));
+		data.add(new CustomType(5, 11L, "Comment#6"));
+		data.add(new CustomType(5, 12L, "Comment#7"));
+		data.add(new CustomType(5, 13L, "Comment#8"));
+		data.add(new CustomType(5, 14L, "Comment#9"));
+		data.add(new CustomType(6, 15L, "Comment#10"));
+		data.add(new CustomType(6, 16L, "Comment#11"));
+		data.add(new CustomType(6, 17L, "Comment#12"));
+		data.add(new CustomType(6, 18L, "Comment#13"));
+		data.add(new CustomType(6, 19L, "Comment#14"));
+		data.add(new CustomType(6, 20L, "Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+
+	}
+
+	public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
+
+		List<CustomType> data = new ArrayList<>();
+		data.add(new CustomType(1, 0L, "Hi"));
+		data.add(new CustomType(2, 1L, "Hello"));
+		data.add(new CustomType(2, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int myInt;
+		public long myLong;
+		public String myString;
+
+		public CustomType() {
+		}
+
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+
+		@Override
+		public String toString() {
+			return myInt + "," + myLong + "," + myString;
+		}
+	}
+
+	private static class CustomTypeComparator implements Comparator<CustomType> {
+		@Override
+		public int compare(CustomType o1, CustomType o2) {
+			int diff = o1.myInt - o2.myInt;
+			if (diff != 0) {
+				return diff;
+			}
+			diff = (int) (o1.myLong - o2.myLong);
+			return diff != 0 ? diff : o1.myString.compareTo(o2.myString);
+		}
+
+	}
+
+	public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
+		List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple7<>(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new Tuple7<>(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 30000L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
+		List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data = new ArrayList<>();
+		data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First"));
+		data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second"));
+		data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third"));
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<>();
+		data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<>();
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<>();
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
+		data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
+		data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
+		data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
+		data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
+		data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
+		data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
+		data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class POJO {
+		public int number;
+		public String str;
+		public Tuple2<Integer, CustomType> nestedTupleWithCustom;
+		public NestedPojo nestedPojo;
+		public transient Long ignoreMe;
+
+		public POJO(int i0, String s0,
+					int i1, int i2, long l0, String s1,
+					long l1) {
+			this.number = i0;
+			this.str = s0;
+			this.nestedTupleWithCustom = new Tuple2<>(i1, new CustomType(i2, l0, s1));
+			this.nestedPojo = new NestedPojo();
+			this.nestedPojo.longNumber = l1;
+		}
+
+		public POJO() {
+		}
+
+		@Override
+		public String toString() {
+			return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
+		}
+	}
+
+	/**
+	 * Nested POJO.
+	 */
+	public static class NestedPojo {
+		public static Object ignoreMe;
+		public long longNumber;
+
+		public NestedPojo() {
+		}
+	}
+
+	public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
+		List<CrazyNested> data = new ArrayList<>();
+		data.add(new CrazyNested("aa"));
+		data.add(new CrazyNested("bb"));
+		data.add(new CrazyNested("bb"));
+		data.add(new CrazyNested("cc"));
+		data.add(new CrazyNested("cc"));
+		data.add(new CrazyNested("cc"));
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * Deeply nested POJO.
+	 */
+	public static class CrazyNested {
+		public CrazyNestedL1 nestLvl1;
+		public Long something; // test proper null-value handling
+
+		public CrazyNested() {
+		}
+
+		public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
+			this(set);
+			something = s;
+			nestLvl1.a = second;
+		}
+
+		public CrazyNested(String set) {
+			nestLvl1 = new CrazyNestedL1();
+			nestLvl1.nestLvl2 = new CrazyNestedL2();
+			nestLvl1.nestLvl2.nestLvl3 = new CrazyNestedL3();
+			nestLvl1.nestLvl2.nestLvl3.nestLvl4 = new CrazyNestedL4();
+			nestLvl1.nestLvl2.nestLvl3.nestLvl4.f1nal = set;
+		}
+	}
+
+	/**
+	 * Nested POJO level 2.
+	 */
+	public static class CrazyNestedL1 {
+		public String a;
+		public int b;
+		public CrazyNestedL2 nestLvl2;
+	}
+
+	/**
+	 * Nested POJO level 3.
+	 */
+	public static class CrazyNestedL2 {
+		public CrazyNestedL3 nestLvl3;
+	}
+
+	/**
+	 * Nested POJO level 4.
+	 */
+	public static class CrazyNestedL3 {
+		public CrazyNestedL4 nestLvl4;
+	}
+
+	/**
+	 * Nested POJO level 5.
+	 */
+	public static class CrazyNestedL4 {
+		public String f1nal;
+	}
+
+	// Copied from TypeExtractorTest
+	private static class FromTuple extends Tuple3<String, String, Long> {
+		private static final long serialVersionUID = 1L;
+		public int special;
+	}
+
+	/**
+	 * Pojo extending from tuple WITH custom fields.
+	 */
+	public static class FromTupleWithCTor extends FromTuple {
+
+		private static final long serialVersionUID = 1L;
+
+		public FromTupleWithCTor() {}
+
+		public FromTupleWithCTor(int special, long tupleField) {
+			this.special = special;
+			this.setField(tupleField, 2);
+		}
+	}
+
+	public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
+		List<FromTupleWithCTor> data = new ArrayList<>();
+		data.add(new FromTupleWithCTor(1, 10L)); // 3x
+		data.add(new FromTupleWithCTor(1, 10L));
+		data.add(new FromTupleWithCTor(1, 10L));
+		data.add(new FromTupleWithCTor(2, 20L)); // 2x
+		data.add(new FromTupleWithCTor(2, 20L));
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO with Tuple and Writable.
+	 */
+	public static class PojoContainingTupleAndWritable {
+		public int someInt;
+		public String someString;
+		public IntWritable hadoopFan;
+		public Tuple2<Long, Long> theTuple;
+
+		public PojoContainingTupleAndWritable() {
+		}
+
+		public PojoContainingTupleAndWritable(int i, long l1, long l2) {
+			hadoopFan = new IntWritable(i);
+			someInt = i;
+			theTuple = new Tuple2<>(l1, l2);
+		}
+	}
+
+	public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<Integer, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
+		List<Tuple3<Integer, CrazyNested, POJO>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
+		data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		// POJO is not initialized according to the first two fields.
+		data.add(new Tuple3<>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class Pojo1 {
+		public String a;
+		public String b;
+
+		public Pojo1() {}
+
+		public Pojo1(String a, String b) {
+			this.a = a;
+			this.b = b;
+		}
+	}
+
+	/**
+	 * Another POJO.
+	 */
+	public static class Pojo2 {
+		public String a2;
+		public String b2;
+	}
+
+	/**
+	 * Nested POJO.
+	 */
+	public static class PojoWithMultiplePojos {
+		public Pojo1 p1;
+		public Pojo2 p2;
+		public Integer i0;
+
+		public PojoWithMultiplePojos() {
+		}
+
+		public PojoWithMultiplePojos(String a, String b, String a1, String b1, Integer i0) {
+			p1 = new Pojo1();
+			p1.a = a;
+			p1.b = b;
+			p2 = new Pojo2();
+			p2.a2 = a1;
+			p2.b2 = b1;
+			this.i0 = i0;
+		}
+	}
+
+	public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
+		List<PojoWithMultiplePojos> data = new ArrayList<>();
+		data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * Custom enum.
+	 */
+	public enum Category {
+		CAT_A, CAT_B
+	}
+
+	/**
+	 * POJO with Date and enum.
+	 */
+	public static class PojoWithDateAndEnum {
+		public String group;
+		public Date date;
+		public Category cat;
+	}
+
+	public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
+		List<PojoWithDateAndEnum> data = new ArrayList<>();
+
+		PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+		one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
+		data.add(one);
+
+		PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+		two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A;
+		data.add(two);
+
+		PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+		three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B;
+		data.add(three);
+
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO with collection.
+	 */
+	public static class PojoWithCollection {
+		public List<Pojo1> pojos;
+		public int key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	/**
+	 * POJO with generic collection.
+	 */
+	public static class PojoWithCollectionGeneric {
+		public List<Pojo1> pojos;
+		public int key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+		private PojoWithDateAndEnum makeMeGeneric;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+		List<PojoWithCollection> data = new ArrayList<>();
+
+		List<Pojo1> pojosList1 = new ArrayList<>();
+		pojosList1.add(new Pojo1("a", "aa"));
+		pojosList1.add(new Pojo1("b", "bb"));
+
+		List<Pojo1> pojosList2 = new ArrayList<>();
+		pojosList2.add(new Pojo1("a2", "aa2"));
+		pojosList2.add(new Pojo1("b2", "bb2"));
+
+		PojoWithCollection pwc1 = new PojoWithCollection();
+		pwc1.pojos = pojosList1;
+		pwc1.key = 0;
+		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc1.scalaBigInt = BigInt.int2bigInt(10);
+		pwc1.bigDecimalKeepItNull = null;
+
+		// use calendar to make it stable across time zones
+		GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
+		pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+		pwc1.mixed = new ArrayList<>();
+		Map<String, Integer> map = new HashMap<>();
+		map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
+		pwc1.mixed.add(map);
+		pwc1.mixed.add(new File("/this/is/wrong"));
+		pwc1.mixed.add("uhlala");
+
+		PojoWithCollection pwc2 = new PojoWithCollection();
+		pwc2.pojos = pojosList2;
+		pwc2.key = 0;
+		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+		pwc2.bigDecimalKeepItNull = null;
+
+		GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+		pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
+
+		data.add(pwc1);
+		data.add(pwc2);
+
+		return env.fromCollection(data);
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/util/ValueCollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/util/ValueCollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/operators/util/ValueCollectionDataSets.java
new file mode 100644
index 0000000..0262617
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/util/ValueCollectionDataSets.java
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.File;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.math.BigInt;
+
+/**
+ * #######################################################################################################
+ * 			BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
+ * 			IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
+ * #######################################################################################################
+ */
+public class ValueCollectionDataSets {
+
+	public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple3<>(new IntValue(1), new LongValue(1L), new StringValue("Hi")));
+		data.add(new Tuple3<>(new IntValue(2), new LongValue(2L), new StringValue("Hello")));
+		data.add(new Tuple3<>(new IntValue(3), new LongValue(2L), new StringValue("Hello world")));
+		data.add(new Tuple3<>(new IntValue(4), new LongValue(3L), new StringValue("Hello world, how are you?")));
+		data.add(new Tuple3<>(new IntValue(5), new LongValue(3L), new StringValue("I am fine.")));
+		data.add(new Tuple3<>(new IntValue(6), new LongValue(3L), new StringValue("Luke Skywalker")));
+		data.add(new Tuple3<>(new IntValue(7), new LongValue(4L), new StringValue("Comment#1")));
+		data.add(new Tuple3<>(new IntValue(8), new LongValue(4L), new StringValue("Comment#2")));
+		data.add(new Tuple3<>(new IntValue(9), new LongValue(4L), new StringValue("Comment#3")));
+		data.add(new Tuple3<>(new IntValue(10), new LongValue(4L), new StringValue("Comment#4")));
+		data.add(new Tuple3<>(new IntValue(11), new LongValue(5L), new StringValue("Comment#5")));
+		data.add(new Tuple3<>(new IntValue(12), new LongValue(5L), new StringValue("Comment#6")));
+		data.add(new Tuple3<>(new IntValue(13), new LongValue(5L), new StringValue("Comment#7")));
+		data.add(new Tuple3<>(new IntValue(14), new LongValue(5L), new StringValue("Comment#8")));
+		data.add(new Tuple3<>(new IntValue(15), new LongValue(5L), new StringValue("Comment#9")));
+		data.add(new Tuple3<>(new IntValue(16), new LongValue(6L), new StringValue("Comment#10")));
+		data.add(new Tuple3<>(new IntValue(17), new LongValue(6L), new StringValue("Comment#11")));
+		data.add(new Tuple3<>(new IntValue(18), new LongValue(6L), new StringValue("Comment#12")));
+		data.add(new Tuple3<>(new IntValue(19), new LongValue(6L), new StringValue("Comment#13")));
+		data.add(new Tuple3<>(new IntValue(20), new LongValue(6L), new StringValue("Comment#14")));
+		data.add(new Tuple3<>(new IntValue(21), new LongValue(6L), new StringValue("Comment#15")));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple3<>(new IntValue(1), new LongValue(1L), new StringValue("Hi")));
+		data.add(new Tuple3<>(new IntValue(2), new LongValue(2L), new StringValue("Hello")));
+		data.add(new Tuple3<>(new IntValue(3), new LongValue(2L), new StringValue("Hello world")));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
+
+		data.add(new Tuple5<>(new IntValue(1), new LongValue(1L), new IntValue(0), new StringValue("Hallo"), new LongValue(1L)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(2L), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(3L), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1L)));
+		data.add(new Tuple5<>(new IntValue(3), new LongValue(4L), new IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(3), new LongValue(5L), new IntValue(4), new StringValue("ABC"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(3), new LongValue(6L), new IntValue(5), new StringValue("BCD"), new LongValue(3L)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(7L), new IntValue(6), new StringValue("CDE"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(8L), new IntValue(7), new StringValue("DEF"), new LongValue(1L)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(9L), new IntValue(8), new StringValue("EFG"), new LongValue(1L)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(10L), new IntValue(9), new StringValue("FGH"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(11L), new IntValue(10), new StringValue("GHI"), new LongValue(1L)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(12L), new IntValue(11), new StringValue("HIJ"), new LongValue(3L)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(13L), new IntValue(12), new StringValue("IJK"), new LongValue(3L)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(14L), new IntValue(13), new StringValue("JKL"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(15L), new IntValue(14), new StringValue("KLM"), new LongValue(2L)));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
+			TupleTypeInfo<>(
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
+
+		data.add(new Tuple5<>(new IntValue(1), new LongValue(1L), new IntValue(0), new StringValue("Hallo"), new LongValue(1L)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(2L), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2L)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(3L), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1L)));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
+			TupleTypeInfo<>(
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
+		List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(1)), new StringValue("one")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("two")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("three")));
+
+		TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
+			TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
+		List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(3)), new StringValue("a")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(2)), new StringValue("a")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(1)), new StringValue("a")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("b")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("c")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(6)), new StringValue("c")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new IntValue(9)), new StringValue("c")));
+
+		TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
+			TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+		List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> data = new ArrayList<>();
+
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7)));
+
+		TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> type = new
+			TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+				ValueTypeInfo.INT_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env) {
+		List<StringValue> data = new ArrayList<>();
+
+		data.add(new StringValue("Hi"));
+		data.add(new StringValue("Hello"));
+		data.add(new StringValue("Hello world"));
+		data.add(new StringValue("Hello world, how are you?"));
+		data.add(new StringValue("I am fine."));
+		data.add(new StringValue("Luke Skywalker"));
+		data.add(new StringValue("Random comment"));
+		data.add(new StringValue("LOL"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) {
+		List<IntValue> data = new ArrayList<>();
+
+		data.add(new IntValue(1));
+		data.add(new IntValue(2));
+		data.add(new IntValue(2));
+		data.add(new IntValue(3));
+		data.add(new IntValue(3));
+		data.add(new IntValue(3));
+		data.add(new IntValue(4));
+		data.add(new IntValue(4));
+		data.add(new IntValue(4));
+		data.add(new IntValue(4));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
+		List<CustomType> data = new ArrayList<CustomType>();
+
+		data.add(new CustomType(1, 0L, "Hi"));
+		data.add(new CustomType(2, 1L, "Hello"));
+		data.add(new CustomType(2, 2L, "Hello world"));
+		data.add(new CustomType(3, 3L, "Hello world, how are you?"));
+		data.add(new CustomType(3, 4L, "I am fine."));
+		data.add(new CustomType(3, 5L, "Luke Skywalker"));
+		data.add(new CustomType(4, 6L, "Comment#1"));
+		data.add(new CustomType(4, 7L, "Comment#2"));
+		data.add(new CustomType(4, 8L, "Comment#3"));
+		data.add(new CustomType(4, 9L, "Comment#4"));
+		data.add(new CustomType(5, 10L, "Comment#5"));
+		data.add(new CustomType(5, 11L, "Comment#6"));
+		data.add(new CustomType(5, 12L, "Comment#7"));
+		data.add(new CustomType(5, 13L, "Comment#8"));
+		data.add(new CustomType(5, 14L, "Comment#9"));
+		data.add(new CustomType(6, 15L, "Comment#10"));
+		data.add(new CustomType(6, 16L, "Comment#11"));
+		data.add(new CustomType(6, 17L, "Comment#12"));
+		data.add(new CustomType(6, 18L, "Comment#13"));
+		data.add(new CustomType(6, 19L, "Comment#14"));
+		data.add(new CustomType(6, 20L, "Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
+		List<CustomType> data = new ArrayList<CustomType>();
+
+		data.add(new CustomType(1, 0L, "Hi"));
+		data.add(new CustomType(2, 1L, "Hello"));
+		data.add(new CustomType(2, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public IntValue myInt;
+		public LongValue myLong;
+		public StringValue myString;
+
+		public CustomType() {
+		}
+
+		public CustomType(int i, long l, String s) {
+			myInt = new IntValue(i);
+			myLong = new LongValue(l);
+			myString = new StringValue(s);
+		}
+
+		@Override
+		public String toString() {
+			return myInt + "," + myLong + "," + myString;
+		}
+	}
+
+	private static class CustomTypeComparator implements Comparator<CustomType> {
+
+		@Override
+		public int compare(CustomType o1, CustomType o2) {
+			int diff = o1.myInt.getValue() - o2.myInt.getValue();
+			if (diff != 0) {
+				return diff;
+			}
+			diff = (int) (o1.myLong.getValue() - o2.myLong.getValue());
+			return diff != 0 ? diff : o1.myString.getValue().compareTo(o2.myString.getValue());
+		}
+
+	}
+
+	private static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
+		List<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> data = new ArrayList<>();
+
+		data.add(new Tuple7<>(new IntValue(1), new StringValue("First"), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new LongValue(10000L)));
+		data.add(new Tuple7<>(new IntValue(2), new StringValue("Second"), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new LongValue(20000L)));
+		data.add(new Tuple7<>(new IntValue(3), new StringValue("Third"), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new LongValue(30000L)));
+
+		return env.fromCollection(data);
+	}
+
+	private static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
+		List<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new IntValue(1), new StringValue("First")));
+		data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new IntValue(2), new StringValue("Second")));
+		data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new IntValue(3), new StringValue("Third")));
+
+		return env.fromCollection(data);
+	}
+
+	private static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+
+		data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+		return env.fromCollection(data);
+	}
+
+	private static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+		return env.fromCollection(data);
+	}
+
+	private static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
+		data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
+		data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
+		data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
+		data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
+		data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
+		data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
+		data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
+
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class POJO {
+		public IntValue number;
+		public StringValue str;
+		public Tuple2<IntValue, CustomType> nestedTupleWithCustom;
+		public NestedPojo nestedPojo;
+		public transient LongValue ignoreMe;
+
+		public POJO(int i0, String s0,
+					int i1, int i2, long l0, String s1,
+					long l1) {
+			this.number = new IntValue(i0);
+			this.str = new StringValue(s0);
+			this.nestedTupleWithCustom = new Tuple2<>(new IntValue(i1), new CustomType(i2, l0, s1));
+			this.nestedPojo = new NestedPojo();
+			this.nestedPojo.longNumber = new LongValue(l1);
+		}
+
+		public POJO() {
+		}
+
+		@Override
+		public String toString() {
+			return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
+		}
+	}
+
+	/**
+	 * Nested POJO.
+	 */
+	public static class NestedPojo {
+		public static Object ignoreMe;
+		public LongValue longNumber;
+
+		public NestedPojo() {
+		}
+	}
+
+	private static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
+		List<CrazyNested> data = new ArrayList<CrazyNested>();
+
+		data.add(new CrazyNested("aa"));
+		data.add(new CrazyNested("bb"));
+		data.add(new CrazyNested("bb"));
+		data.add(new CrazyNested("cc"));
+		data.add(new CrazyNested("cc"));
+		data.add(new CrazyNested("cc"));
+
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * Deeply nested POJO.
+	 */
+	public static class CrazyNested {
+		public CrazyNestedL1 nestLvl1;
+		public LongValue something; // test proper null-value handling
+
+		public CrazyNested() {
+		}
+
+		public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
+			this(set);
+			something = new LongValue(s);
+			nestLvl1.a = new StringValue(second);
+		}
+
+		public CrazyNested(String set) {
+			nestLvl1 = new CrazyNestedL1();
+			nestLvl1.nestLvl2 = new CrazyNestedL2();
+			nestLvl1.nestLvl2.nestLvl3 = new CrazyNestedL3();
+			nestLvl1.nestLvl2.nestLvl3.nestLvl4 = new CrazyNestedL4();
+			nestLvl1.nestLvl2.nestLvl3.nestLvl4.f1nal = new StringValue(set);
+		}
+	}
+
+	/**
+	 * Nested POJO level 2.
+	 */
+	public static class CrazyNestedL1 {
+		public StringValue a;
+		public IntValue b;
+		public CrazyNestedL2 nestLvl2;
+	}
+
+	/**
+	 * Nested POJO level 3.
+	 */
+	public static class CrazyNestedL2 {
+		public CrazyNestedL3 nestLvl3;
+	}
+
+	/**
+	 * Nested POJO level 4.
+	 */
+	public static class CrazyNestedL3 {
+		public CrazyNestedL4 nestLvl4;
+	}
+
+	/**
+	 * Nested POJO level 5.
+	 */
+	public static class CrazyNestedL4 {
+		public StringValue f1nal;
+	}
+
+	// Copied from TypeExtractorTest
+	private static class FromTuple extends Tuple3<StringValue, StringValue, LongValue> {
+		private static final long serialVersionUID = 1L;
+		public IntValue special;
+	}
+
+	private static class FromTupleWithCTor extends FromTuple {
+
+		private static final long serialVersionUID = 1L;
+
+		public FromTupleWithCTor() {}
+
+		public FromTupleWithCTor(int special, long tupleField) {
+			this.special = new IntValue(special);
+			this.setField(new LongValue(tupleField), 2);
+		}
+	}
+
+	public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
+		List<FromTupleWithCTor> data = new ArrayList<>();
+		data.add(new FromTupleWithCTor(1, 10L)); // 3x
+		data.add(new FromTupleWithCTor(1, 10L));
+		data.add(new FromTupleWithCTor(1, 10L));
+		data.add(new FromTupleWithCTor(2, 20L)); // 2x
+		data.add(new FromTupleWithCTor(2, 20L));
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO with Tuple and Writable.
+	 */
+	public static class PojoContainingTupleAndWritable {
+		public IntValue someInt;
+		public StringValue someString;
+		public IntWritable hadoopFan;
+		public Tuple2<LongValue, LongValue> theTuple;
+
+		public PojoContainingTupleAndWritable() {
+		}
+
+		public PojoContainingTupleAndWritable(int i, long l1, long l2) {
+			hadoopFan = new IntWritable(i);
+			someInt = new IntValue(i);
+			theTuple = new Tuple2<>(new LongValue(l1), new LongValue(l2));
+		}
+	}
+
+	public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
+		List<Tuple3<IntValue, CrazyNested, POJO>> data = new ArrayList<>();
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		// POJO is not initialized according to the first two fields.
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class Pojo1 {
+		public StringValue a;
+		public StringValue b;
+
+		public Pojo1() {}
+
+		public Pojo1(String a, String b) {
+			this.a = new StringValue(a);
+			this.b = new StringValue(b);
+		}
+	}
+
+	/**
+	 * POJO.
+	 */
+	public static class Pojo2 {
+		public StringValue a2;
+		public StringValue b2;
+	}
+
+	/**
+	 * Nested POJO.
+	 */
+	public static class PojoWithMultiplePojos {
+		public Pojo1 p1;
+		public Pojo2 p2;
+		public IntValue i0;
+
+		public PojoWithMultiplePojos() {
+		}
+
+		public PojoWithMultiplePojos(String a, String b, String a1, String b1, int i0) {
+			p1 = new Pojo1();
+			p1.a = new StringValue(a);
+			p1.b = new StringValue(b);
+			p2 = new Pojo2();
+			p2.a2 = new StringValue(a1);
+			p2.b2 = new StringValue(b1);
+			this.i0 = new IntValue(i0);
+		}
+	}
+
+	public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
+		List<PojoWithMultiplePojos> data = new ArrayList<>();
+		data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * Custom enum.
+	 */
+	public enum Category {
+		CAT_A, CAT_B;
+	}
+
+	/**
+	 * POJO with Data and enum.
+	 */
+	public static class PojoWithDateAndEnum {
+		public StringValue group;
+		public Date date;
+		public Category cat;
+	}
+
+	public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
+		List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>();
+
+		PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+		one.group = new StringValue("a");
+		one.date = new Date(666);
+		one.cat = Category.CAT_A;
+		data.add(one);
+
+		PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+		two.group = new StringValue("a");
+		two.date = new Date(666);
+		two.cat = Category.CAT_A;
+		data.add(two);
+
+		PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+		three.group = new StringValue("b");
+		three.date = new Date(666);
+		three.cat = Category.CAT_B;
+		data.add(three);
+
+		return env.fromCollection(data);
+	}
+
+	/**
+	 * POJO with collection.
+	 */
+	public static class PojoWithCollection {
+		public List<Pojo1> pojos;
+		public IntValue key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	/**
+	 * POJO with generic collection.
+	 */
+	public static class PojoWithCollectionGeneric {
+		public List<Pojo1> pojos;
+		public IntValue key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+		private PojoWithDateAndEnum makeMeGeneric;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+		List<PojoWithCollection> data = new ArrayList<>();
+
+		List<Pojo1> pojosList1 = new ArrayList<>();
+		pojosList1.add(new Pojo1("a", "aa"));
+		pojosList1.add(new Pojo1("b", "bb"));
+
+		List<Pojo1> pojosList2 = new ArrayList<>();
+		pojosList2.add(new Pojo1("a2", "aa2"));
+		pojosList2.add(new Pojo1("b2", "bb2"));
+
+		PojoWithCollection pwc1 = new PojoWithCollection();
+		pwc1.pojos = pojosList1;
+		pwc1.key = new IntValue(0);
+		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc1.scalaBigInt = BigInt.int2bigInt(10);
+		pwc1.bigDecimalKeepItNull = null;
+
+		// use calendar to make it stable across time zones
+		GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18);
+		pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+		pwc1.mixed = new ArrayList<Object>();
+		Map<StringValue, IntValue> map = new HashMap<>();
+		map.put(new StringValue("someKey"), new IntValue(1));
+		pwc1.mixed.add(map);
+		pwc1.mixed.add(new File("/this/is/wrong"));
+		pwc1.mixed.add("uhlala");
+
+		PojoWithCollection pwc2 = new PojoWithCollection();
+		pwc2.pojos = pojosList2;
+		pwc2.key = new IntValue(0);
+		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+		pwc2.bigDecimalKeepItNull = null;
+
+		GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+		pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
+
+		data.add(pwc1);
+		data.add(pwc2);
+
+		return env.fromCollection(data);
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
index 32fe7c5..a13f523 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.test.optimizer.examples;
 
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-import java.util.Collection;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
@@ -41,29 +36,39 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Validate the compilation and result of a single iteration of KMeans.
+ */
 @SuppressWarnings("serial")
 public class KMeansSingleStepTest extends CompilerTestBase {
-	
+
 	private static final String DATAPOINTS = "Data Points";
 	private static final String CENTERS = "Centers";
-	
+
 	private static final String MAPPER_NAME = "Find Nearest Centers";
 	private static final String REDUCER_NAME = "Recompute Center Positions";
-	
+
 	private static final String SINK = "New Center Positions";
-	
+
 	private final FieldList set0 = new FieldList(0);
-	
-	
+
 	@Test
 	public void testCompileKMeansSingleStepWithStats() {
 
@@ -71,11 +76,11 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		p.setExecutionConfig(new ExecutionConfig());
 		// set the statistics
 		OperatorResolver cr = getContractResolver(p);
-		GenericDataSourceBase<?,?> pointsSource = cr.getNode(DATAPOINTS);
-		GenericDataSourceBase<?,?> centersSource = cr.getNode(CENTERS);
+		GenericDataSourceBase<?, ?> pointsSource = cr.getNode(DATAPOINTS);
+		GenericDataSourceBase<?, ?> centersSource = cr.getNode(CENTERS);
 		setSourceStatistics(pointsSource, 100L * 1024 * 1024 * 1024, 32f);
 		setSourceStatistics(centersSource, 1024 * 1024, 32f);
-		
+
 		OptimizedPlan plan = compileWithStats(p);
 		checkPlan(plan);
 	}
@@ -88,33 +93,31 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		OptimizedPlan plan = compileNoStats(p);
 		checkPlan(plan);
 	}
-	
-	
+
 	private void checkPlan(OptimizedPlan plan) {
-		
+
 		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-		
+
 		final SinkPlanNode sink = or.getNode(SINK);
 		final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
 		final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
 		final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-		
+
 		// check the mapper
 		assertEquals(1, mapper.getBroadcastInputs().size());
 		assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
 		assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
-		
+
 		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
 		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-		
+
 		assertEquals(DriverStrategy.MAP, mapper.getDriverStrategy());
-		
+
 		assertNull(mapper.getInput().getLocalStrategyKeys());
 		assertNull(mapper.getInput().getLocalStrategySortOrder());
 		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
 		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-		
-		
+
 		// check the combiner
 		Assert.assertNotNull(combiner);
 		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -124,7 +127,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		assertNull(combiner.getInput().getLocalStrategySortOrder());
 		assertEquals(set0, combiner.getKeys(0));
 		assertEquals(set0, combiner.getKeys(1));
-		
+
 		// check the reducer
 		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
 		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
@@ -132,7 +135,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		assertEquals(set0, reducer.getKeys(0));
 		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
 		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-		
+
 		// check the sink
 		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
@@ -143,7 +146,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			KMeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
+			kmeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
 		} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -153,7 +156,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		return env.getPlan();
 	}
 
-	public static void KMeans(String[] args) throws Exception {
+	public static void kmeans(String[] args) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Point> points = env.readCsvFile(args[0])
@@ -183,14 +186,17 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		DataSet<Tuple3<Integer, Point, Integer>> newCentroids = points
 				.map(new SelectNearestCenter()).name(MAPPER_NAME).withBroadcastSet(centroids, "centroids");
 
-		DataSet<Tuple3<Integer, Point, Integer>> recomputeClusterCenter
-				= newCentroids.groupBy(0).reduceGroup(new RecomputeClusterCenter()).name(REDUCER_NAME);
+		DataSet<Tuple3<Integer, Point, Integer>> recomputeClusterCenter =
+				newCentroids.groupBy(0).reduceGroup(new RecomputeClusterCenter()).name(REDUCER_NAME);
 
 		recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n", " ").name(SINK);
 
 		env.execute("KMeans Example");
 	}
 
+	/**
+	 * Two-dimensional point.
+	 */
 	public static class Point extends Tuple2<Double, Double> {
 		public Point(double x, double y) {
 			this.f0 = x;
@@ -218,6 +224,9 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		}
 	}
 
+	/**
+	 * Center of a cluster.
+	 */
 	public static class Centroid extends Tuple2<Integer, Point> {
 
 		public Centroid(int id, double x, double y) {
@@ -234,7 +243,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 	/**
 	 * Determines the closest cluster center for a data point.
 	 */
-	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple3<Integer, Point, Integer>> {
+	private static final class SelectNearestCenter extends RichMapFunction<Point, Tuple3<Integer, Point, Integer>> {
 		private Collection<Centroid> centroids;
 
 		@Override
@@ -257,10 +266,9 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		}
 	}
 
-	public static final class RecomputeClusterCenter implements
+	private static final class RecomputeClusterCenter implements
 		GroupReduceFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, Integer>>,
-		GroupCombineFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, Integer>>
-	{
+		GroupCombineFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, Integer>> {
 
 		@Override
 		public void reduce(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index 80e3ae0..caccd81 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.optimizer.examples;
 
-import java.util.Arrays;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -43,35 +41,37 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 /**
  * Tests TPCH Q3 (simplified) under various input conditions.
  */
 @SuppressWarnings("serial")
 public class RelationalQueryCompilerTest extends CompilerTestBase {
-	
+
 	private static final String ORDERS = "Orders";
 	private static final String LINEITEM = "LineItems";
 	private static final String MAPPER_NAME = "FilterO";
 	private static final String JOIN_NAME = "JoinLiO";
 	private static final String REDUCE_NAME = "AggLiO";
 	private static final String SINK = "Output";
-	
+
 	private final FieldList set0 = new FieldList(0);
-	private final FieldList set01 = new FieldList(0,1);
+	private final FieldList set01 = new FieldList(0, 1);
 	private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
-	
+
 	// ------------------------------------------------------------------------
-	
-	
+
 	/**
 	 * Verifies that a robust repartitioning plan with a hash join is created in the absence of statistics.
 	 */
@@ -82,9 +82,9 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			p.setExecutionConfig(defaultExecutionConfig);
 			// compile
 			final OptimizedPlan plan = compileNoStats(p);
-			
+
 			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-			
+
 			// get the nodes from the final plan
 			final SinkPlanNode sink = or.getNode(SINK);
 			final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
@@ -92,7 +92,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 					(SingleInputPlanNode) reducer.getPredecessor() : null;
 			final DualInputPlanNode join = or.getNode(JOIN_NAME);
 			final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
-			
+
 			// verify the optimizer choices
 			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
 			Assert.assertTrue(checkRepartitionShipStrategies(join, reducer, combiner));
@@ -102,16 +102,16 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			Assert.fail(e.getMessage());
 		}
 	}
-	
+
 	/**
 	 * Checks if any valid plan is produced. Hash joins are expected to build the orders side, as the statistics
 	 * indicate this to be the smaller one.
 	 */
 	@Test
 	public void testQueryAnyValidPlan() {
-		testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, 0.05f, true, true, true, false, true);
+		testQueryGeneric(1024 * 1024 * 1024L, 8 * 1024 * 1024 * 1024L, 0.05f, 0.05f, true, true, true, false, true);
 	}
-	
+
 	/**
 	 * Verifies that the plan compiles in the presence of empty size=0 estimates.
 	 */
@@ -119,23 +119,23 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	public void testQueryWithSizeZeroInputs() {
 		testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false, true);
 	}
-	
+
 	/**
 	 * Statistics that push towards a broadcast join.
 	 */
 	@Test
 	public void testQueryWithStatsForBroadcastHash() {
-		testQueryGeneric(1024L*1024*1024*1024, 1024L*1024*1024*1024, 0.01f, 0.05f, true, false, true, false, false);
+		testQueryGeneric(1024L * 1024 * 1024 * 1024, 1024L * 1024 * 1024 * 1024, 0.01f, 0.05f, true, false, true, false, false);
 	}
-	
+
 	/**
 	 * Statistics that push towards a broadcast join.
 	 */
 	@Test
 	public void testQueryWithStatsForRepartitionAny() {
-		testQueryGeneric(100L*1024*1024*1024*1024, 100L*1024*1024*1024*1024, 0.1f, 0.5f, false, true, true, true, true);
+		testQueryGeneric(100L * 1024 * 1024 * 1024 * 1024, 100L * 1024 * 1024 * 1024 * 1024, 0.1f, 0.5f, false, true, true, true, true);
 	}
-	
+
 	/**
 	 * Statistics that push towards a repartition merge join. If the join blows the data volume up significantly,
 	 * re-exploiting the sorted order is cheaper.
@@ -146,45 +146,43 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 		p.setExecutionConfig(defaultExecutionConfig);
 		// set compiler hints
 		OperatorResolver cr = getContractResolver(p);
-		DualInputOperator<?,?,?,?> match = cr.getNode(JOIN_NAME);
+		DualInputOperator<?, ?, ?, ?> match = cr.getNode(JOIN_NAME);
 		match.getCompilerHints().setFilterFactor(100f);
-		
-		testQueryGeneric(100L*1024*1024*1024*1024, 100L*1024*1024*1024*1024, 0.01f, 100f, false, true, false, false, true);
+
+		testQueryGeneric(100L * 1024 * 1024 * 1024 * 1024, 100L * 1024 * 1024 * 1024 * 1024, 0.01f, 100f, false, true, false, false, true);
 	}
-	
+
 	// ------------------------------------------------------------------------
-	private void testQueryGeneric(long orderSize, long lineItemSize, 
+	private void testQueryGeneric(long orderSize, long lineItemSize,
 			float ordersFilterFactor, float joinFilterFactor,
 			boolean broadcastOkay, boolean partitionedOkay,
-			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
-	{
+			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
 		Plan p = getTPCH3Plan();
 		p.setExecutionConfig(defaultExecutionConfig);
 		testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
 	}
-		
-	private void testQueryGeneric(Plan p, long orderSize, long lineitemSize, 
-			float orderSelectivity, float joinSelectivity, 
+
+	private void testQueryGeneric(Plan p, long orderSize, long lineitemSize,
+			float orderSelectivity, float joinSelectivity,
 			boolean broadcastOkay, boolean partitionedOkay,
-			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
-	{
+			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) {
 		try {
 			// set statistics
 			OperatorResolver cr = getContractResolver(p);
-			GenericDataSourceBase<?,?> ordersSource = cr.getNode(ORDERS);
-			GenericDataSourceBase<?,?> lineItemSource = cr.getNode(LINEITEM);
-			SingleInputOperator<?,?,?> mapper = cr.getNode(MAPPER_NAME);
-			DualInputOperator<?,?,?,?> joiner = cr.getNode(JOIN_NAME);
+			GenericDataSourceBase<?, ?> ordersSource = cr.getNode(ORDERS);
+			GenericDataSourceBase<?, ?> lineItemSource = cr.getNode(LINEITEM);
+			SingleInputOperator<?, ?, ?> mapper = cr.getNode(MAPPER_NAME);
+			DualInputOperator<?, ?, ?, ?> joiner = cr.getNode(JOIN_NAME);
 			setSourceStatistics(ordersSource, orderSize, 100f);
 			setSourceStatistics(lineItemSource, lineitemSize, 140f);
 			mapper.getCompilerHints().setAvgOutputRecordSize(16f);
 			mapper.getCompilerHints().setFilterFactor(orderSelectivity);
 			joiner.getCompilerHints().setFilterFactor(joinSelectivity);
-			
+
 			// compile
 			final OptimizedPlan plan = compileWithStats(p);
 			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-			
+
 			// get the nodes from the final plan
 			final SinkPlanNode sink = or.getNode(SINK);
 			final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
@@ -192,13 +190,13 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 					(SingleInputPlanNode) reducer.getPredecessor() : null;
 			final DualInputPlanNode join = or.getNode(JOIN_NAME);
 			final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
-			
+
 			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
-			
+
 			// check the possible variants and that the variant ia allowed in this specific setting
 			if (checkBroadcastShipStrategies(join, reducer, combiner)) {
 				Assert.assertTrue("Broadcast join incorrectly chosen.", broadcastOkay);
-				
+
 				if (checkHashJoinStrategies(join, reducer, true)) {
 					Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
 				} else if (checkHashJoinStrategies(join, reducer, false)) {
@@ -211,7 +209,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			}
 			else if (checkRepartitionShipStrategies(join, reducer, combiner)) {
 				Assert.assertTrue("Partitioned join incorrectly chosen.", partitionedOkay);
-				
+
 				if (checkHashJoinStrategies(join, reducer, true)) {
 					Assert.assertTrue("Hash join (build orders) incorrectly chosen", hashJoinFirstOkay);
 				} else if (checkHashJoinStrategies(join, reducer, false)) {
@@ -235,12 +233,11 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	// ------------------------------------------------------------------------
 
 	private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
-			SingleInputPlanNode reducer, SinkPlanNode sink)
-	{
+			SingleInputPlanNode reducer, SinkPlanNode sink) {
 		// check ship strategies that are always fix
 		Assert.assertEquals(ShipStrategyType.FORWARD, map.getInput().getShipStrategy());
 		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-		
+
 		// check the driver strategies that are always fix
 		Assert.assertEquals(DriverStrategy.FLAT_MAP, map.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
@@ -250,14 +247,13 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			Assert.assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
 		}
 	}
-	
+
 	private boolean checkBroadcastShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
-			SingleInputPlanNode combiner)
-	{
+			SingleInputPlanNode combiner) {
 		if (ShipStrategyType.BROADCAST == join.getInput1().getShipStrategy() &&
 			ShipStrategyType.FORWARD == join.getInput2().getShipStrategy() &&
-			ShipStrategyType.PARTITION_HASH == reducer.getInput().getShipStrategy())
-		{
+			ShipStrategyType.PARTITION_HASH == reducer.getInput().getShipStrategy()) {
+
 			// check combiner
 			Assert.assertNotNull("Plan should have a combiner", combiner);
 			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -266,14 +262,13 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			return false;
 		}
 	}
-	
+
 	private boolean checkRepartitionShipStrategies(DualInputPlanNode join, SingleInputPlanNode reducer,
-			SingleInputPlanNode combiner)
-	{
+			SingleInputPlanNode combiner) {
 		if (ShipStrategyType.PARTITION_HASH == join.getInput1().getShipStrategy() &&
 			ShipStrategyType.PARTITION_HASH == join.getInput2().getShipStrategy() &&
-			ShipStrategyType.FORWARD == reducer.getInput().getShipStrategy())
-		{
+			ShipStrategyType.FORWARD == reducer.getInput().getShipStrategy()) {
+
 			// check combiner
 			Assert.assertNull("Plan should not have a combiner", combiner);
 			return true;
@@ -281,20 +276,20 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			return false;
 		}
 	}
-	
+
 	private boolean checkHashJoinStrategies(DualInputPlanNode join, SingleInputPlanNode reducer, boolean buildFirst) {
-		if ( (buildFirst && DriverStrategy.HYBRIDHASH_BUILD_FIRST == join.getDriverStrategy()) ||
-			 (!buildFirst && DriverStrategy.HYBRIDHASH_BUILD_SECOND == join.getDriverStrategy()) ) 
-		{
+		if ((buildFirst && DriverStrategy.HYBRIDHASH_BUILD_FIRST == join.getDriverStrategy()) ||
+			(!buildFirst && DriverStrategy.HYBRIDHASH_BUILD_SECOND == join.getDriverStrategy())) {
+
 			// driver keys
 			Assert.assertEquals(set0, join.getKeysForInput1());
 			Assert.assertEquals(set0, join.getKeysForInput2());
-			
+
 			// local strategies
 			Assert.assertEquals(LocalStrategy.NONE, join.getInput1().getLocalStrategy());
 			Assert.assertEquals(LocalStrategy.NONE, join.getInput2().getLocalStrategy());
 			Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-			
+
 			// local strategy keys
 			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
 			Assert.assertEquals(set01, reducer.getKeys(0));
@@ -304,18 +299,18 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			return false;
 		}
 	}
-	
+
 	private boolean checkBroadcastMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
 		if (DriverStrategy.INNER_MERGE == join.getDriverStrategy()) {
 			// driver keys
 			Assert.assertEquals(set0, join.getKeysForInput1());
 			Assert.assertEquals(set0, join.getKeysForInput2());
-			
+
 			// local strategies
 			Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
 			Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
 			Assert.assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-			
+
 			// local strategy keys
 			Assert.assertEquals(set0, join.getInput1().getLocalStrategyKeys());
 			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
@@ -328,18 +323,18 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			return false;
 		}
 	}
-	
+
 	private boolean checkRepartitionMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) {
 		if (DriverStrategy.INNER_MERGE == join.getDriverStrategy()) {
 			// driver keys
 			Assert.assertEquals(set0, join.getKeysForInput1());
 			Assert.assertEquals(set0, join.getKeysForInput2());
-			
+
 			// local strategies
 			Assert.assertEquals(LocalStrategy.SORT, join.getInput1().getLocalStrategy());
 			Assert.assertEquals(LocalStrategy.SORT, join.getInput2().getLocalStrategy());
 			Assert.assertEquals(LocalStrategy.NONE, reducer.getInput().getLocalStrategy());
-			
+
 			// local strategy keys
 			Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
 			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
@@ -357,30 +352,30 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			TCPH3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE});
+			tcph3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE});
 		} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail("TCPH3 failed with an exception");
+			Assert.fail("tcph3 failed with an exception");
 		}
 		return env.getPlan();
 	}
 
-	public static void TCPH3(String[] args) throws Exception {
+	public static void tcph3(String[] args) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(Integer.parseInt(args[0]));
 
 		//order id, order status, order data, order prio, ship prio
-		DataSet<Tuple5<Long, String, String, String, Integer>> orders
-				= env.readCsvFile(args[1])
+		DataSet<Tuple5<Long, String, String, String, Integer>> orders =
+				env.readCsvFile(args[1])
 				.fieldDelimiter("|").lineDelimiter("\n")
 				.includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class)
 				.name(ORDERS);
 
 		//order id, extended price
-		DataSet<Tuple2<Long, Double>> lineItems
-				= env.readCsvFile(args[2])
+		DataSet<Tuple2<Long, Double>> lineItems =
+				env.readCsvFile(args[2])
 				.fieldDelimiter("|").lineDelimiter("\n")
 				.includeFields("100001").types(Long.class, Double.class)
 				.name(LINEITEM);
@@ -397,7 +392,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	}
 
 	@ForwardedFields("f0; f4->f1")
-	public static class FilterO implements FlatMapFunction<Tuple5<Long, String, String, String, Integer>, Tuple2<Long, Integer>> {
+	private static class FilterO implements FlatMapFunction<Tuple5<Long, String, String, String, Integer>, Tuple2<Long, Integer>> {
 		@Override
 		public void flatMap(Tuple5<Long, String, String, String, Integer> value, Collector<Tuple2<Long, Integer>> out) throws Exception {
 			// not going to be executed
@@ -405,7 +400,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	}
 
 	@ForwardedFieldsFirst("f0; f1")
-	public static class JoinLiO implements FlatJoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Double>, Tuple3<Long, Integer, Double>> {
+	private static class JoinLiO implements FlatJoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Double>, Tuple3<Long, Integer, Double>> {
 		@Override
 		public void join(Tuple2<Long, Integer> first, Tuple2<Long, Double> second, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
 			// not going to be executed
@@ -413,10 +408,9 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	}
 
 	@ForwardedFields("f0; f1")
-	public static class AggLiO implements
-		GroupReduceFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>>,
-		GroupCombineFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>>
-	{
+	private static class AggLiO implements
+			GroupReduceFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>>,
+			GroupCombineFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>> {
 		@Override
 		public void reduce(Iterable<Tuple3<Long, Integer, Double>> values, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
 			// not going to be executed

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
index ab49282..af780ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.optimizer.examples;
 
-import java.util.Arrays;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -33,13 +31,19 @@ import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+
+/**
+ * Validate program compilation.
+ */
 public class WordCountCompilerTest extends CompilerTestBase {
 
 	private static final long serialVersionUID = 8988304231385358228L;
@@ -52,7 +56,7 @@ public class WordCountCompilerTest extends CompilerTestBase {
 		checkWordCount(true);
 		checkWordCount(false);
 	}
-	
+
 	private void checkWordCount(boolean estimates) {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -81,8 +85,8 @@ public class WordCountCompilerTest extends CompilerTestBase {
 
 		OptimizedPlan plan;
 		if (estimates) {
-			GenericDataSourceBase<?,?> source = getContractResolver(p).getNode("Input Lines");
-			setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
+			GenericDataSourceBase<?, ?> source = getContractResolver(p).getNode("Input Lines");
+			setSourceStatistics(source, 1024 * 1024 * 1024 * 1024L, 24f);
 			plan = compileWithStats(p);
 		} else {
 			plan = compileNoStats(p);
@@ -111,7 +115,7 @@ public class WordCountCompilerTest extends CompilerTestBase {
 		Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
 		Assert.assertEquals(l, combiner.getKeys(0));
 		Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-			
+
 	}
-	
+
 }


Mime
View raw message