flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [13/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
new file mode 100644
index 0000000..a196984
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.junit.Assert;
+
+public class TupleSerializerTestInstance<T extends Tuple> extends SerializerTestInstance<T> {
+
+	public TupleSerializerTestInstance(TypeSerializer<T> serializer, Class<T> typeClass, int length, T[] testData) {
+		super(serializer, typeClass, length, testData);
+	}
+	
+	protected void deepEquals(String message, T shouldTuple, T isTuple) {
+		Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity());
+		
+		for (int i = 0; i < shouldTuple.getArity(); i++) {
+			Object should = shouldTuple.getField(i);
+			Object is = isTuple.getField(i);
+			
+			if (should.getClass().isArray()) {
+				if (should instanceof boolean[]) {
+					Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is));
+				}
+				else if (should instanceof byte[]) {
+					assertArrayEquals(message, (byte[]) should, (byte[]) is);
+				}
+				else if (should instanceof short[]) {
+					assertArrayEquals(message, (short[]) should, (short[]) is);
+				}
+				else if (should instanceof int[]) {
+					assertArrayEquals(message, (int[]) should, (int[]) is);
+				}
+				else if (should instanceof long[]) {
+					assertArrayEquals(message, (long[]) should, (long[]) is);
+				}
+				else if (should instanceof float[]) {
+					assertArrayEquals(message, (float[]) should, (float[]) is, 0.0f);
+				}
+				else if (should instanceof double[]) {
+					assertArrayEquals(message, (double[]) should, (double[]) is, 0.0);
+				}
+				else if (should instanceof char[]) {
+					assertArrayEquals(message, (char[]) should, (char[]) is);
+				}
+				else {
+					assertArrayEquals(message, (Object[]) should, (Object[]) is);
+				}
+			}
+			else {
+				assertEquals(message,  should, is);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
new file mode 100644
index 0000000..cf9874d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+public class ValueComparatorTest extends ComparatorTestBase<StringValue> {
+
+	StringValue[] data = new StringValue[]{
+		new StringValue(""),
+		new StringValue("Lorem Ipsum Dolor Omit Longer"),
+		new StringValue("aaaa"),
+		new StringValue("abcd"),
+		new StringValue("abce"),
+		new StringValue("abdd"),
+		new StringValue("accd"),
+		new StringValue("bbcd")
+	};
+
+	@Override
+	protected TypeComparator<StringValue> createComparator(boolean ascending) {
+		return new ValueComparator<StringValue>(ascending, StringValue.class);
+	}
+
+	@Override
+	protected TypeSerializer<StringValue> createSerializer() {
+		return new ValueSerializer<StringValue>(StringValue.class);
+	}
+
+	@Override
+	protected StringValue[] getSortedTestData() {
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
new file mode 100644
index 0000000..a2c3e1e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> {
+	@Override
+	protected TypeComparator<ValueID> createComparator(boolean ascending) {
+		return new ValueComparator<>(ascending, ValueID.class);
+	}
+
+	@Override
+	protected TypeSerializer<ValueID> createSerializer() {
+		return new ValueSerializer<>(ValueID.class);
+	}
+
+	@Override
+	protected ValueID[] getSortedTestData() {
+		return new ValueID[] {
+			new ValueID(new UUID(0, 0)),
+			new ValueID(new UUID(1, 0)),
+			new ValueID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
new file mode 100644
index 0000000..d644485
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class ValueID implements Value, Comparable<ValueID> {
+	private static final long serialVersionUID = -562791433077971752L;
+
+	private UUID id;
+
+	public ValueID() {
+		id = UUID.randomUUID();
+	}
+
+	public ValueID(UUID id) {
+		this.id = id;
+	}
+
+	@Override
+	public int compareTo(ValueID o) {
+		return id.compareTo(o.id);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id.getMostSignificantBits());
+		out.writeLong(id.getLeastSignificantBits());
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = new UUID(in.readLong(), in.readLong());
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ValueID) {
+			ValueID other = (ValueID) obj;
+
+			return id.equals(other.id);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return id.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
new file mode 100644
index 0000000..9c07a5e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> {
+	@Override
+	protected TypeSerializer<ValueID> createSerializer() {
+		return new ValueSerializer<>(ValueID.class);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<ValueID> getTypeClass() {
+		return ValueID.class;
+	}
+
+	@Override
+	protected ValueID[] getTestData() {
+		return new ValueID[] {
+			new ValueID(new UUID(0, 0)),
+			new ValueID(new UUID(1, 0)),
+			new ValueID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
new file mode 100644
index 0000000..f5a90b7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {	
+	
+	StringArrayWritable[] data = new StringArrayWritable[]{
+			new StringArrayWritable(new String[]{}),
+			new StringArrayWritable(new String[]{""}),
+			new StringArrayWritable(new String[]{"a","a"}),
+			new StringArrayWritable(new String[]{"a","b"}),
+			new StringArrayWritable(new String[]{"c","c"}),
+			new StringArrayWritable(new String[]{"d","f"}),
+			new StringArrayWritable(new String[]{"d","m"}),
+			new StringArrayWritable(new String[]{"z","x"}),
+			new StringArrayWritable(new String[]{"a","a", "a"})
+	};
+	
+	@Override
+	protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) {
+		return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class);
+	}
+	
+	@Override
+	protected TypeSerializer<StringArrayWritable> createSerializer() {
+		return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
+	}
+	
+	@Override
+	protected StringArrayWritable[] getSortedTestData() {
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
new file mode 100644
index 0000000..94e759d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
+	@Override
+	protected TypeComparator<WritableID> createComparator(boolean ascending) {
+		return new WritableComparator<>(ascending, WritableID.class);
+	}
+
+	@Override
+	protected TypeSerializer<WritableID> createSerializer() {
+		return new WritableSerializer<>(WritableID.class);
+	}
+
+	@Override
+	protected WritableID[] getSortedTestData() {
+		return new WritableID[] {
+			new WritableID(new UUID(0, 0)),
+			new WritableID(new UUID(1, 0)),
+			new WritableID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
new file mode 100644
index 0000000..4274cf6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+public class WritableID implements WritableComparable<WritableID> {
+	private UUID uuid;
+
+	public WritableID() {
+		this.uuid = UUID.randomUUID();
+	}
+
+	public WritableID(UUID uuid) {
+		this.uuid = uuid;
+	}
+
+	@Override
+	public int compareTo(WritableID o) {
+		return this.uuid.compareTo(o.uuid);
+	}
+
+	@Override
+	public void write(DataOutput dataOutput) throws IOException {
+		dataOutput.writeLong(uuid.getMostSignificantBits());
+		dataOutput.writeLong(uuid.getLeastSignificantBits());
+	}
+
+	@Override
+	public void readFields(DataInput dataInput) throws IOException {
+		this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
+	}
+
+	@Override
+	public String toString() {
+		return uuid.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		WritableID id = (WritableID) o;
+
+		return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
+	}
+
+	@Override
+	public int hashCode() {
+		return uuid != null ? uuid.hashCode() : 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
new file mode 100644
index 0000000..bb5f4d4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.junit.Test;
+
+public class WritableSerializerTest {
+	
+	@Test
+	public void testStringArrayWritable() {
+		StringArrayWritable[] data = new StringArrayWritable[]{
+				new StringArrayWritable(new String[]{}),
+				new StringArrayWritable(new String[]{""}),
+				new StringArrayWritable(new String[]{"a","a"}),
+				new StringArrayWritable(new String[]{"a","b"}),
+				new StringArrayWritable(new String[]{"c","c"}),
+				new StringArrayWritable(new String[]{"d","f"}),
+				new StringArrayWritable(new String[]{"d","m"}),
+				new StringArrayWritable(new String[]{"z","x"}),
+				new StringArrayWritable(new String[]{"a","a", "a"})
+		};
+		
+		WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
+		WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig());
+		
+		SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data);
+		
+		testInstance.testAll();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
new file mode 100644
index 0000000..2af7730
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
+	@Override
+	protected TypeSerializer<WritableID> createSerializer() {
+		return new WritableSerializer<>(WritableID.class);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<WritableID> getTypeClass() {
+		return WritableID.class;
+	}
+
+	@Override
+	protected WritableID[] getTestData() {
+		return new WritableID[] {
+			new WritableID(new UUID(0, 0)),
+			new WritableID(new UUID(1, 0)),
+			new WritableID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
new file mode 100644
index 0000000..7572408
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KryoClearedBufferTest {
+
+	/**
+	 * Tests that the kryo output buffer is cleared in case of an exception. Flink uses the
+	 * EOFException to signal that a buffer is full. In such a case, the record which was tried
+	 * to be written will be rewritten. Therefore, eventually buffered data of this record has
+	 * to be cleared.
+	 */
+	@Test
+	public void testOutputBufferedBeingClearedInCaseOfException() throws Exception {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.registerTypeWithKryoSerializer(TestRecord.class, new TestRecordSerializer());
+		executionConfig.registerKryoType(TestRecord.class);
+
+		KryoSerializer<TestRecord> kryoSerializer = new KryoSerializer<TestRecord>(
+			TestRecord.class,
+			executionConfig);
+
+		int size = 94;
+		int bufferSize = 150;
+
+		TestRecord testRecord = new TestRecord(size);
+
+		TestDataOutputView target = new TestDataOutputView(bufferSize);
+
+		kryoSerializer.serialize(testRecord, target);
+
+		try {
+			kryoSerializer.serialize(testRecord, target);
+			Assert.fail("Expected an EOFException.");
+		} catch(EOFException eofException) {
+			// expected exception
+			// now the Kryo Output should have been cleared
+		}
+
+		TestRecord actualRecord = kryoSerializer.deserialize(
+				new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer())));
+
+		Assert.assertEquals(testRecord, actualRecord);
+
+		target.clear();
+
+		// if the kryo output has been cleared then we can serialize our test record into the target
+		// because the target buffer 150 bytes can host one TestRecord (total serialization size 100)
+		kryoSerializer.serialize(testRecord, target);
+
+		byte[] buffer = target.getBuffer();
+		int counter = 0;
+
+		for (int i = 0; i < buffer.length; i++) {
+			if(buffer[i] == 42) {
+				counter++;
+			}
+		}
+
+		Assert.assertEquals(size, counter);
+	}
+
+	public static class TestRecord {
+		private byte[] buffer;
+
+		public TestRecord(int size) {
+			buffer = new byte[size];
+
+			Arrays.fill(buffer, (byte)42);
+		}
+
+		public TestRecord(byte[] buffer){
+			this.buffer = buffer;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof TestRecord) {
+				TestRecord record = (TestRecord) obj;
+
+				return Arrays.equals(buffer, record.buffer);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class TestRecordSerializer extends Serializer<TestRecord> implements Serializable {
+
+		private static final long serialVersionUID = 6971996565421454985L;
+
+		@Override
+		public void write(Kryo kryo, Output output, TestRecord object) {
+			output.writeInt(object.buffer.length);
+			output.write(object.buffer);
+		}
+
+		@Override
+		public TestRecord read(Kryo kryo, Input input, Class<TestRecord> type) {
+			int length = input.readInt();
+			byte[] buffer = input.readBytes(length);
+
+			return new TestRecord(buffer);
+		}
+	}
+
+	public static class TestDataOutputView implements DataOutputView {
+
+		private byte[] buffer;
+		private int position;
+
+		public TestDataOutputView(int size) {
+			buffer = new byte[size];
+			position = 0;
+		}
+
+		public void clear() {
+			position = 0;
+		}
+
+		public byte[] getBuffer() {
+			return buffer;
+		}
+
+		public void checkSize(int numBytes) throws EOFException {
+			if (position + numBytes > buffer.length) {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			checkSize(numBytes);
+
+			position += numBytes;
+		}
+
+		@Override
+		public void write(DataInputView source, int numBytes) throws IOException {
+			checkSize(numBytes);
+
+			byte[] tempBuffer = new byte[numBytes];
+
+			source.read(tempBuffer);
+
+			System.arraycopy(tempBuffer, 0, buffer, position, numBytes);
+
+			position += numBytes;
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			checkSize(4);
+
+			position += 4;
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			checkSize(b.length);
+
+			System.arraycopy(b, 0, buffer, position, b.length);
+			position += b.length;
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			checkSize(len);
+
+			System.arraycopy(b, off, buffer, position, len);
+
+			position += len;
+		}
+
+		@Override
+		public void writeBoolean(boolean v) throws IOException {
+			checkSize(1);
+			position += 1;
+		}
+
+		@Override
+		public void writeByte(int v) throws IOException {
+			checkSize(1);
+
+			buffer[position] = (byte)v;
+
+			position++;
+		}
+
+		@Override
+		public void writeShort(int v) throws IOException {
+			checkSize(2);
+
+			position += 2;
+		}
+
+		@Override
+		public void writeChar(int v) throws IOException {
+			checkSize(1);
+			position++;
+		}
+
+		@Override
+		public void writeInt(int v) throws IOException {
+			checkSize(4);
+
+			position += 4;
+		}
+
+		@Override
+		public void writeLong(long v) throws IOException {
+			checkSize(8);
+			position += 8;
+		}
+
+		@Override
+		public void writeFloat(float v) throws IOException {
+			checkSize(4);
+			position += 4;
+		}
+
+		@Override
+		public void writeDouble(double v) throws IOException {
+			checkSize(8);
+			position += 8;
+		}
+
+		@Override
+		public void writeBytes(String s) throws IOException {
+			byte[] sBuffer = s.getBytes();
+			checkSize(sBuffer.length);
+			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
+			position += sBuffer.length;
+		}
+
+		@Override
+		public void writeChars(String s) throws IOException {
+			byte[] sBuffer = s.getBytes();
+			checkSize(sBuffer.length);
+			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
+			position += sBuffer.length;
+		}
+
+		@Override
+		public void writeUTF(String s) throws IOException {
+			byte[] sBuffer = s.getBytes();
+			checkSize(sBuffer.length);
+			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
+			position += sBuffer.length;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
new file mode 100644
index 0000000..9d3eef1
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest;
+
+public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
+	@Override
+	protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type, new ExecutionConfig());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
new file mode 100644
index 0000000..19aed78
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest;
+
+public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type, new ExecutionConfig());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
new file mode 100644
index 0000000..8ff0b1b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
+import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("unchecked")
+public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+
+	ExecutionConfig ec = new ExecutionConfig();
+	
+	@Test
+	public void testJavaList(){
+		Collection<Integer> a = new ArrayList<>();
+
+		fillCollection(a);
+
+		runTests(a);
+	}
+
+	@Test
+	public void testJavaSet(){
+		Collection<Integer> b = new HashSet<>();
+
+		fillCollection(b);
+
+		runTests(b);
+	}
+
+
+
+	@Test
+	public void testJavaDequeue(){
+		Collection<Integer> c = new LinkedList<>();
+		fillCollection(c);
+		runTests(c);
+	}
+
+	private void fillCollection(Collection<Integer> coll) {
+		coll.add(42);
+		coll.add(1337);
+		coll.add(49);
+		coll.add(1);
+	}
+
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type, ec);
+	}
+	
+	/**
+	 * Make sure that the kryo serializer forwards EOF exceptions properly when serializing
+	 */
+	@Test
+	public void testForwardEOFExceptionWhileSerializing() {
+		try {
+			// construct a long string
+			String str;
+			{
+				char[] charData = new char[40000];
+				Random rnd = new Random();
+				
+				for (int i = 0; i < charData.length; i++) {
+					charData[i] = (char) rnd.nextInt(10000);
+				}
+				
+				str = new String(charData);
+			}
+			
+			// construct a memory target that is too small for the string
+			TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000);
+			KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig());
+			
+			try {
+				serializer.serialize(str, target);
+				fail("should throw a java.io.EOFException");
+			}
+			catch (java.io.EOFException e) {
+				// that is how we like it
+			}
+			catch (Exception e) {
+				fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Make sure that the kryo serializer forwards EOF exceptions properly when serializing
+	 */
+	@Test
+	public void testForwardEOFExceptionWhileDeserializing() {
+		try {
+			int numElements = 100;
+			// construct a memory target that is too small for the string
+			TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements);
+			KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig());
+
+			for(int i = 0; i < numElements; i++){
+				serializer.serialize(i, target);
+			}
+
+			ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer());
+
+			for(int i = 0; i < numElements; i++){
+				int value = serializer.deserialize(source);
+				assertEquals(i, value);
+			}
+
+			try {
+				serializer.deserialize(source);
+				fail("should throw a java.io.EOFException");
+			}
+			catch (java.io.EOFException e) {
+				// that is how we like it :-)
+			}
+			catch (Exception e) {
+				fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void validateReferenceMappingDisabled() {
+		KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+		Kryo kryo = serializer.getKryo();
+		assertFalse(kryo.getReferences());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
new file mode 100644
index 0000000..d68afd6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
+import org.joda.time.LocalDate;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+@SuppressWarnings("unchecked")
+public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializerTest {
+	
+
+	@Test
+	public void testJodaTime(){
+		Collection<LocalDate> b = new HashSet<LocalDate>();
+
+		b.add(new LocalDate(1L));
+		b.add(new LocalDate(2L));
+
+		runTests(b);
+	}
+
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		ExecutionConfig conf = new ExecutionConfig();
+		conf.registerTypeWithKryoSerializer(LocalDate.class, LocalDateSerializer.class);
+		TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
+		return typeInfo.createSerializer(conf);
+	}
+	
+	public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void write(Kryo kryo, Output output, LocalDate object) {
+			output.writeInt(object.getYear());
+			output.writeInt(object.getMonthOfYear());
+			output.writeInt(object.getDayOfMonth());
+		}
+		
+		@Override
+		public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
+			return new LocalDate(input.readInt(), input.readInt(), input.readInt());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
new file mode 100644
index 0000000..7c6d023
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertTrue;
+
+public class SerializersTest {
+
+	// recursive
+	public static class Node {
+		private Node parent;
+	}
+	
+	public static class FromNested {
+		Node recurseMe;
+	}
+	
+	public static class FromGeneric1 {}
+	public static class FromGeneric2 {}
+	
+	public static class Nested1 {
+		private FromNested fromNested;
+		private Path yodaIntervall;
+	}
+
+	public static class ClassWithNested {
+		
+		Nested1 nested;
+		int ab;
+		
+		ArrayList<FromGeneric1> addGenType;
+		FromGeneric2[] genericArrayType;
+	}
+
+	@Test
+	public void testTypeRegistration() {
+		ExecutionConfig conf = new ExecutionConfig();
+		Serializers.recursivelyRegisterType(ClassWithNested.class, conf, new HashSet<Class<?>>());
+		
+		KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type.
+
+		Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
+		Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
+		Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
+		
+		// check if the generic type from one field is also registered (its very likely that
+		// generic types are also used as fields somewhere.
+		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
+		Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
+		Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
+		
+		
+		// register again and make sure classes are still registered
+		ExecutionConfig conf2 = new ExecutionConfig();
+		Serializers.recursivelyRegisterType(ClassWithNested.class, conf2, new HashSet<Class<?>>());
+		KryoSerializer<String> kryo2 = new KryoSerializer<>(String.class, conf);
+		assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0);
+	}
+
+	@Test
+	public void testTypeRegistrationFromTypeInfo() {
+		ExecutionConfig conf = new ExecutionConfig();
+		Serializers.recursivelyRegisterType(new GenericTypeInfo<>(ClassWithNested.class), conf, new HashSet<Class<?>>());
+
+		KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type.
+
+		assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
+
+		// check if the generic type from one field is also registered (its very likely that
+		// generic types are also used as fields somewhere.
+		assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
+		assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
new file mode 100644
index 0000000..faab26a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.tuple.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class TupleComparatorTestBase<T extends Tuple> extends ComparatorTestBase<T> {
+
+	@Override
+	protected void deepEquals(String message, T should, T is) {
+		for (int x = 0; x < should.getArity(); x++) {
+			assertEquals(should.getField(x), is.getField(x));
+		}
+	}
+
+	@Override
+	protected abstract TupleComparator<T> createComparator(boolean ascending);
+
+	@Override
+	protected abstract TupleSerializer<T> createSerializer();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
new file mode 100644
index 0000000..1d414d8
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.tuple.base;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+/**
+ * Abstract test base for TuplePairComparators.
+ *
+ * @param <T>
+ * @param <R>
+ */
+public abstract class TuplePairComparatorTestBase<T extends Tuple, R extends Tuple> extends TestLogger {
+
+	protected abstract TypePairComparator<T, R> createComparator(boolean ascending);
+
+	protected abstract Tuple2<T[], R[]> getSortedTestData();
+
+	@Test
+	public void testEqualityWithReference() {
+		try {
+			TypePairComparator<T, R> comparator = getComparator(true);
+			Tuple2<T[], R[]> data = getSortedData();
+			for (int x = 0; x < data.f0.length; x++) {
+				comparator.setReference(data.f0[x]);
+
+				assertTrue(comparator.equalToReference(data.f1[x]));
+			}
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testInequalityWithReference() {
+		testGreatSmallAscDescWithReference(true);
+		testGreatSmallAscDescWithReference(false);
+	}
+
+	protected void testGreatSmallAscDescWithReference(boolean ascending) {
+		try {
+			Tuple2<T[], R[]> data = getSortedData();
+
+			TypePairComparator<T, R> comparator = getComparator(ascending);
+
+			//compares every element in high with every element in low
+			for (int x = 0; x < data.f0.length - 1; x++) {
+				for (int y = x + 1; y < data.f1.length; y++) {
+					comparator.setReference(data.f0[x]);
+					if (ascending) {
+						assertTrue(comparator.compareToReference(data.f1[y]) > 0);
+					} else {
+						assertTrue(comparator.compareToReference(data.f1[y]) < 0);
+					}
+				}
+			}
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	protected TypePairComparator<T, R> getComparator(boolean ascending) {
+		TypePairComparator<T, R> comparator = createComparator(ascending);
+		if (comparator == null) {
+			throw new RuntimeException("Test case corrupt. Returns null as comparator.");
+		}
+		return comparator;
+	}
+
+	protected Tuple2<T[], R[]> getSortedData() {
+		Tuple2<T[], R[]> data = getSortedTestData();
+		if (data == null || data.f0 == null || data.f1 == null) {
+			throw new RuntimeException("Test case corrupt. Returns null as test data.");
+		}
+		if (data.f0.length < 2 || data.f1.length < 2) {
+			throw new RuntimeException("Test case does not provide enough sorted test data.");
+		}
+
+		return data;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index a31e89d..3203d75 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -40,22 +40,12 @@ under the License.
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>${shading-artifact.name}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.avro</groupId>
-			<artifactId>avro</artifactId>
-			<!-- version is derived from base module -->
-		</dependency>
-		
-		<dependency>
-			<groupId>com.esotericsoftware.kryo</groupId>
-			<artifactId>kryo</artifactId>
-		</dependency>
 
 		<dependency>
 			<groupId>org.ow2.asm</groupId>
@@ -64,12 +54,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.twitter</groupId>
-			<artifactId>chill-java</artifactId>
-			<version>${chill.version}</version>
-		</dependency>
-
-		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
@@ -88,20 +72,6 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-
-		<dependency>
-			<groupId>joda-time</groupId>
-			<artifactId>joda-time</artifactId>
-			<version>2.5</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.joda</groupId>
-			<artifactId>joda-convert</artifactId>
-			<version>1.7</version>
-			<scope>test</scope>
-		</dependency>
 		
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 6db32c5..6bcdb52 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -63,7 +63,7 @@ import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.PartitionOperator;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 2edc533..80f8199 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.Configuration;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.util.List;
 import java.util.Random;
 
 import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
@@ -61,24 +60,6 @@ public final class Utils {
 		return String.format("%s(%s:%d)", elem.getMethodName(), elem.getFileName(), elem.getLineNumber());
 	}
 
-	/**
-	 * Returns all GenericTypeInfos contained in a composite type.
-	 *
-	 * @param typeInfo {@link CompositeType}
-	 */
-	public static void getContainedGenericTypes(CompositeType<?> typeInfo, List<GenericTypeInfo<?>> target) {
-		for (int i = 0; i < typeInfo.getArity(); i++) {
-			TypeInformation<?> type = typeInfo.getTypeAt(i);
-			if (type instanceof CompositeType) {
-				getContainedGenericTypes((CompositeType<?>) type, target);
-			} else if (type instanceof GenericTypeInfo) {
-				if (!target.contains(type)) {
-					target.add((GenericTypeInfo<?>) type);
-				}
-			}
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
deleted file mode 100644
index 3d06c59..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.Function;
-
-import java.io.Serializable;
-
-/**
- * The {@link KeySelector} allows to use arbitrary objects for operations such as
- * reduce, reduceGroup, join, coGoup, etc.
- * 
- * The extractor takes an object and returns the key for that object.
- *
- * @param <IN> Type of objects to extract the key from.
- * @param <KEY> Type of key.
- */
-public interface KeySelector<IN, KEY> extends Function, Serializable {
-	
-	/**
-	 * User-defined function that extracts the key from an arbitrary object.
-	 * 
-	 * For example for a class:
-	 * <pre>
-	 * 	public class Word {
-	 * 		String word;
-	 * 		int count;
-	 * 	}
-	 * </pre>
-	 * The key extractor could return the word as
-	 * a key to group all Word objects by the String they contain.
-	 * 
-	 * The code would look like this
-	 * <pre>
-	 * 	public String getKey(Word w) {
-	 * 		return w.word;
-	 * 	}
-	 * </pre>
-	 * 
-	 * @param value The object to get the key from.
-	 * @return The extracted key.
-	 * 
-	 * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
-	 *                   and trigger recovery or cancellation of the program. 
-	 */
-	KEY getKey(IN value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 761eeb3..a75b8e0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -37,7 +37,7 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 import java.lang.annotation.Annotation;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index c44929f..6763cdf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.common.operators.Keys;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index 00e0d3b..6485936 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 845deb4..6c6b051 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -40,9 +41,9 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanRightUnwrappingCoGroupOperator;
@@ -297,11 +298,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
 
-		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1);
-		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2);
+		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
+		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
 
-		final Operator<Tuple2<K, I1>> keyedInput1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
-		final Operator<Tuple2<K, I2>> keyedInput2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+		final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
+		final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2);
 
 		final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
 			new PlanBothUnwrappingCoGroupOperator<>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
@@ -324,8 +325,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
-		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2);
-		final Operator<Tuple2<K, I2>> keyedInput2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
+		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
+		final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2);
 		
 		final PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
 				new PlanRightUnwrappingCoGroupOperator<>(
@@ -355,8 +356,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
-		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1);
-		final Operator<Tuple2<K, I1>> keyedInput1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
+		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
+		final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
 
 		final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
 				new PlanLeftUnwrappingCoGroupOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
index 30639c3..74f54b8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
@@ -21,11 +21,12 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
 
 /**
  * A {@link DataSet} that is the result of a CoGroup transformation. 

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 915a053..8745271 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index a302478..7b3001f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
index 6717c6d..4af9108 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 5102c80..9979f59 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -20,12 +20,13 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
@@ -116,8 +117,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 		
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
-		Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
+		Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
 		
 		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
 				new PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, typeInfoWithKey, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index ef0c12f..a43b869 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 
@@ -195,8 +196,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	{
 		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
-		Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
+		Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
 
 		PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer =
 			new PlanUnwrappingGroupCombineOperator<>(function, keys, name, outputType, typeInfoWithKey);
@@ -217,9 +218,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	{
 		final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
 		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>)rawSortingKeys;
-		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey, sortingKey);
+		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey, sortingKey);
 
-		Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
+		Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
 
 		PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer =
 			new PlanUnwrappingSortedGroupCombineOperator<>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index b1bf844..42553a0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -28,7 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -281,9 +282,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			boolean combinable)
 	{
 		SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
 
-		Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+		Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
 
 		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
 			new PlanUnwrappingReduceGroupOperator(function, keys, name, outputType, typeInfoWithKey, combinable);
@@ -305,9 +306,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	{
 		final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
 		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKey;
-		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey,sortingKey);
+		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey,sortingKey);
 
-		Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
+		Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey);
 
 		PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> reducer =
 			new PlanUnwrappingSortedReduceGroupOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index c117458..823aee4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.DataSet;
 
 /**


Mime
View raw message