flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:03 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
deleted file mode 100644
index 8cdee9b..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD1Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-	@SuppressWarnings("unchecked")
-	Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(5, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(6, "world", 20.0),
-		new Tuple3<Integer, String, Double>(7, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(8, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(9, "world", 20.0),
-		new Tuple3<Integer, String, Double>(10, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(11, "hello", 23.2)
-	};
-
-	@Override
-	protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) {
-		return new TupleComparator<Tuple3<Integer, String, Double>>(
-				new int[]{0},
-				new TypeComparator[]{ new IntComparator(ascending) },
-				new TypeSerializer[]{ IntSerializer.INSTANCE });
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() {
-		return new TupleSerializer<Tuple3<Integer, String, Double>>(
-				(Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new IntSerializer(),
-					new StringSerializer(),
-					new DoubleSerializer()});
-	}
-
-	@Override
-	protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-		return dataISD;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
deleted file mode 100644
index 06c292f..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD2Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-	@SuppressWarnings("unchecked")
-	Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(4, "world", 23.2),
-		new Tuple3<Integer, String, Double>(5, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(5, "world", 20.0),
-		new Tuple3<Integer, String, Double>(6, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(6, "world", 20.0),
-		new Tuple3<Integer, String, Double>(7, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(7, "world", 23.2)
-	};
-
-	@Override
-	protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) {
-		return new TupleComparator<Tuple3<Integer, String, Double>>(
-				new int[]{0, 1},
-				new TypeComparator[]{
-					new IntComparator(ascending),
-					new StringComparator(ascending)
-				},
-		new TypeSerializer[]{ IntSerializer.INSTANCE, StringSerializer.INSTANCE, DoubleSerializer.INSTANCE });
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() {
-		return new TupleSerializer<Tuple3<Integer, String, Double>>(
-				(Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new IntSerializer(),
-					new StringSerializer(),
-					new DoubleSerializer()});
-	}
-
-	@Override
-	protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-		return dataISD;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
deleted file mode 100644
index d823a29..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorISD3Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> {
-
-	@SuppressWarnings("unchecked")
-	Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(4, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(4, "world", 20.0),
-		new Tuple3<Integer, String, Double>(5, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(5, "hello", 23.2),
-		new Tuple3<Integer, String, Double>(5, "world", 20.0),
-		new Tuple3<Integer, String, Double>(6, "hello", 20.0),
-		new Tuple3<Integer, String, Double>(6, "hello", 23.2)
-	};
-
-	@Override
-	protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) {
-		return new TupleComparator<Tuple3<Integer, String, Double>>(
-				new int[]{0, 1, 2},
-				new TypeComparator[]{
-					new IntComparator(ascending),
-					new StringComparator(ascending),
-					new DoubleComparator(ascending)
-				},
-		new TypeSerializer[]{ IntSerializer.INSTANCE, StringSerializer.INSTANCE, DoubleSerializer.INSTANCE });
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() {
-		return new TupleSerializer<Tuple3<Integer, String, Double>>(
-				(Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new IntSerializer(),
-					new StringSerializer(),
-					new DoubleSerializer()});
-	}
-
-	@Override
-	protected Tuple3<Integer, String, Double>[] getSortedTestData() {
-		return dataISD;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
deleted file mode 100644
index cf73be2..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorTTT1Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
-
-	@SuppressWarnings("unchecked")
-	Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
-		
-	};
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator(
-			boolean ascending) {
-		return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-				new int[] { 0 },
-				new TypeComparator[] {
-						new TupleComparator<Tuple2<String, Double>>(
-								new int[] { 0, 1 },
-								new TypeComparator[] {
-								new StringComparator(ascending),
-								new DoubleComparator(ascending) },
-								new TypeSerializer[] {
-										StringSerializer.INSTANCE,
-										DoubleSerializer.INSTANCE }) },
-				new TypeSerializer[] {
-						new TupleSerializer<Tuple2<String, Double>>(
-								(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										StringSerializer.INSTANCE,
-										DoubleSerializer.INSTANCE }),
-						new TupleSerializer<Tuple2<Long, Long>>(
-								(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										LongSerializer.INSTANCE,
-										LongSerializer.INSTANCE }),
-						new TupleSerializer<Tuple2<Integer, Long>>(
-								(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										IntSerializer.INSTANCE,
-										LongSerializer.INSTANCE }) });
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() {
-		return new  TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-				(Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new TupleSerializer<Tuple2<String, Double>> (
-							(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									StringSerializer.INSTANCE,
-									DoubleSerializer.INSTANCE
-					}),
-					new TupleSerializer<Tuple2<Long, Long>> (
-							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									LongSerializer.INSTANCE,
-									LongSerializer.INSTANCE
-					}),
-					new TupleSerializer<Tuple2<Integer, Long>> (
-							(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									IntSerializer.INSTANCE,
-									LongSerializer.INSTANCE
-					})
-				});
-	}
-
-	@Override
-	protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() {
-		return this.dataISD;
-	}
-	
-	@Override
-	protected void deepEquals(
-			String message,
-			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should,
-			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) {
-		
-		for (int x = 0; x < should.getArity(); x++) {
-			// Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields.
-			if(should.getField(x) instanceof Tuple2) {
-				this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
-			}
-			else {
-				assertEquals(message, should.getField(x), is.getField(x));
-			}
-		}// For
-	}
-	
-	protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
-		for (int x = 0; x < should.getArity(); x++) {
-			assertEquals(message, should.getField(x), is.getField(x));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
deleted file mode 100644
index 4b07c61..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorTTT2Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
-
-	@SuppressWarnings("unchecked")
-	Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
-		
-	};
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator(
-			boolean ascending) {
-		return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-				new int[] { 0, 2 },
-				new TypeComparator[] {
-						new TupleComparator<Tuple2<String, Double>>(
-								new int[] { 0, 1 },
-								new TypeComparator[] {
-								new StringComparator(ascending),
-								new DoubleComparator(ascending) },
-								new TypeSerializer[] {
-										StringSerializer.INSTANCE,
-										DoubleSerializer.INSTANCE }),
-						new TupleComparator<Tuple2<Integer, Long>>(
-								new int[] {	0, 1 },
-								new TypeComparator[] {
-								new IntComparator(ascending),
-								new LongComparator(ascending) },
-								new TypeSerializer[] {
-										IntSerializer.INSTANCE,
-										LongSerializer.INSTANCE }) },
-				new TypeSerializer[] {
-						new TupleSerializer<Tuple2<String, Double>>(
-								(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										StringSerializer.INSTANCE,
-										DoubleSerializer.INSTANCE }),
-						new TupleSerializer<Tuple2<Long, Long>>(
-								(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										LongSerializer.INSTANCE,
-										LongSerializer.INSTANCE }),
-						new TupleSerializer<Tuple2<Integer, Long>>(
-								(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										IntSerializer.INSTANCE,
-										LongSerializer.INSTANCE }) });
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() {
-		return new  TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-				(Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new TupleSerializer<Tuple2<String, Double>> (
-							(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									StringSerializer.INSTANCE,
-									DoubleSerializer.INSTANCE}),
-					new TupleSerializer<Tuple2<Long, Long>> (
-							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									LongSerializer.INSTANCE,
-									LongSerializer.INSTANCE}),
-					new TupleSerializer<Tuple2<Integer, Long>> (
-							(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									IntSerializer.INSTANCE,
-									LongSerializer.INSTANCE})
-				});
-	}
-
-	@Override
-	protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() {
-		return this.dataISD;
-	}
-	
-	@Override
-	protected void deepEquals(
-			String message,
-			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should,
-			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) {
-		
-		for (int x = 0; x < should.getArity(); x++) {
-			// Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields.
-			if(should.getField(x) instanceof Tuple2) {
-				this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
-			}
-			else {
-				assertEquals(message, should.getField(x), is.getField(x));
-			}
-		}// For
-	}
-	
-	protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
-		for (int x = 0; x < should.getArity(); x++) {
-			assertEquals(message, should.getField(x), is.getField(x));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
deleted file mode 100644
index 0dfc094..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertEquals;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringComparator;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorTTT3Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>{
-	@SuppressWarnings("unchecked")
-	Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
-		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
-		
-	};
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator(
-			boolean ascending) {
-		return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-				new int[] { 0, 1, 2 },
-				new TypeComparator[] {
-						new TupleComparator<Tuple2<String, Double>>(
-								new int[] { 0, 1 },
-								new TypeComparator[] {
-								new StringComparator(ascending),
-								new DoubleComparator(ascending) },
-								new TypeSerializer[] {
-										StringSerializer.INSTANCE,
-										DoubleSerializer.INSTANCE }),
-						new TupleComparator<Tuple2<Long, Long>>(
-								new int[] { 0, 1 },
-								new TypeComparator[] {
-								new LongComparator(ascending),
-								new LongComparator(ascending) },
-								new TypeSerializer[] {
-										LongSerializer.INSTANCE,
-										LongSerializer.INSTANCE }),
-						new TupleComparator<Tuple2<Integer, Long>>(
-								new int[] {	0, 1 },
-								new TypeComparator[] {
-								new IntComparator(ascending),
-								new LongComparator(ascending) },
-								new TypeSerializer[] {
-										IntSerializer.INSTANCE,
-										LongSerializer.INSTANCE }) },
-				new TypeSerializer[] {
-						new TupleSerializer<Tuple2<String, Double>>(
-								(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										StringSerializer.INSTANCE,
-										DoubleSerializer.INSTANCE }),
-						new TupleSerializer<Tuple2<Long, Long>>(
-								(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										LongSerializer.INSTANCE,
-										LongSerializer.INSTANCE }),
-						new TupleSerializer<Tuple2<Integer, Long>>(
-								(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-								new TypeSerializer[] {
-										IntSerializer.INSTANCE,
-										LongSerializer.INSTANCE }) });
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() {
-		return new  TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
-				(Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
-				new TypeSerializer[]{
-					new TupleSerializer<Tuple2<String, Double>> (
-							(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									StringSerializer.INSTANCE,
-									DoubleSerializer.INSTANCE
-					}),
-					new TupleSerializer<Tuple2<Long, Long>> (
-							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									LongSerializer.INSTANCE,
-									LongSerializer.INSTANCE
-					}),
-					new TupleSerializer<Tuple2<Integer, Long>> (
-							(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
-							new TypeSerializer[]{
-									IntSerializer.INSTANCE,
-									LongSerializer.INSTANCE
-					})
-				});
-	}
-
-	@Override
-	protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() {
-		return this.dataISD;
-	}
-	
-	@Override
-	protected void deepEquals(
-			String message,
-			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should,
-			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) {
-		
-		for (int x = 0; x < should.getArity(); x++) {
-			// Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields.
-			if(should.getField(x) instanceof Tuple2) {
-				this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
-			}
-			else {
-				assertEquals(message, should.getField(x), is.getField(x));
-			}
-		}// For
-	}
-	
-	protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
-		for (int x = 0; x < should.getArity(); x++) {
-			assertEquals(message, should.getField(x), is.getField(x));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
deleted file mode 100644
index 017eb44..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.util.ArrayList;
-import java.util.Random;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.util.StringUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TupleSerializerTest {
-
-	@Test
-	public void testTuple0() {
-		Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, Tuple0.INSTANCE, Tuple0.INSTANCE };
-
-		runTests(testTuples);
-	}
-
-	@Test
-	public void testTuple1Int() {
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		Tuple1<Integer>[] testTuples = new Tuple1[] {
-			new Tuple1<Integer>(42), new Tuple1<Integer>(1), new Tuple1<Integer>(0), new Tuple1<Integer>(-1),
-			new Tuple1<Integer>(Integer.MAX_VALUE), new Tuple1<Integer>(Integer.MIN_VALUE)
-		};
-		
-		runTests(testTuples);
-	}
-	
-	@Test
-	public void testTuple1String() {
-		Random rnd = new Random(68761564135413L);
-		
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		Tuple1<String>[] testTuples = new Tuple1[] {
-			new Tuple1<String>(StringUtils.getRandomString(rnd, 10, 100)),
-			new Tuple1<String>("abc"),
-			new Tuple1<String>(""),
-			new Tuple1<String>(StringUtils.getRandomString(rnd, 30, 170)),
-			new Tuple1<String>(StringUtils.getRandomString(rnd, 15, 50)),
-			new Tuple1<String>("")
-		};
-		
-		runTests(testTuples);
-	}
-	
-	@Test
-	public void testTuple1StringArray() {
-		Random rnd = new Random(289347567856686223L);
-		
-		String[] arr1 = new String[] {"abc", "",
-				StringUtils.getRandomString(rnd, 10, 100),
-				StringUtils.getRandomString(rnd, 15, 50),
-				StringUtils.getRandomString(rnd, 30, 170),
-				StringUtils.getRandomString(rnd, 14, 15),
-				""};
-		
-		String[] arr2 = new String[] {"foo", "",
-				StringUtils.getRandomString(rnd, 10, 100),
-				StringUtils.getRandomString(rnd, 1000, 5000),
-				StringUtils.getRandomString(rnd, 30000, 35000),
-				StringUtils.getRandomString(rnd, 100*1024, 105*1024),
-				"bar"};
-		
-		@SuppressWarnings("unchecked")
-		Tuple1<String[]>[] testTuples = new Tuple1[] {
-			new Tuple1<String[]>(arr1),
-			new Tuple1<String[]>(arr2)
-		};
-		
-		runTests(testTuples);
-	}
-	
-	@Test
-	public void testTuple2StringDouble() {
-		Random rnd = new Random(807346528946L);
-		
-		@SuppressWarnings("unchecked")
-		Tuple2<String, Double>[] testTuples = new Tuple2[] {
-				new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-				new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-				new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-				new Tuple2<String, Double>("", rnd.nextDouble()),
-				new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()),
-				new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble())
-			};
-		
-		runTests(testTuples);
-	}
-	
-	@Test
-	public void testTuple2StringStringArray() {
-		Random rnd = new Random(289347567856686223L);
-		
-		String[] arr1 = new String[] {"abc", "",
-				StringUtils.getRandomString(rnd, 10, 100),
-				StringUtils.getRandomString(rnd, 15, 50),
-				StringUtils.getRandomString(rnd, 30, 170),
-				StringUtils.getRandomString(rnd, 14, 15),
-				""};
-		
-		String[] arr2 = new String[] {"foo", "",
-				StringUtils.getRandomString(rnd, 10, 100),
-				StringUtils.getRandomString(rnd, 1000, 5000),
-				StringUtils.getRandomString(rnd, 30000, 35000),
-				StringUtils.getRandomString(rnd, 100*1024, 105*1024),
-				"bar"};
-		
-		@SuppressWarnings("unchecked")
-		Tuple2<String, String[]>[] testTuples = new Tuple2[] {
-			new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1),
-			new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2),
-			new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1),
-			new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2),
-			new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2)
-		};
-		
-		runTests(testTuples);
-	}
-	
-
-	@Test
-	public void testTuple5CustomObjects() {
-		Random rnd = new Random(807346528946L);
-		
-		SimpleTypes a = new SimpleTypes();
-		SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		
-		ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
-		ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
-		ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
-		ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
-		ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
-		
-		ComplexNestedObject2 co1 = new ComplexNestedObject2(rnd);
-		ComplexNestedObject2 co2 = new ComplexNestedObject2();
-		ComplexNestedObject2 co3 = new ComplexNestedObject2(rnd);
-		ComplexNestedObject2 co4 = new ComplexNestedObject2(rnd);
-		
-		Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
-		Book b2 = new Book(0L, "Debugging byte streams", 1337);
-		Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-		Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF);
-		Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00);
-		Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L);
-		
-		ArrayList<String> list = new ArrayList<String>();
-		list.add("A");
-		list.add("B");
-		list.add("C");
-		list.add("D");
-		list.add("E");
-		
-		BookAuthor ba1 = new BookAuthor(976243875L, list, "Arno Nym");
-		
-		ArrayList<String> list2 = new ArrayList<String>();
-		BookAuthor ba2 = new BookAuthor(987654321L, list2, "The Saurus");
-		
-		
-		@SuppressWarnings("unchecked")
-		Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>[] testTuples = new Tuple5[] {
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(a, b1, o1, ba1, co1),
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(b, b2, o2, ba2, co2),
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(c, b3, o3, ba1, co3),
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(d, b2, o4, ba1, co4),
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(e, b4, o5, ba2, co4),
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(f, b5, o1, ba2, co4),
-				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2)
-		};
-		
-		runTests(testTuples);
-	}
-
-	private <T extends Tuple> void runTests(T... instances) {
-		try {
-			TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]);
-			TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig());
-			
-			Class<T> tupleClass = tupleTypeInfo.getTypeClass();
-			
-			int length = -1;
-			if(tupleClass == Tuple0.class) {
-				length = 1;
-			}
-			TupleSerializerTestInstance<T> test = new TupleSerializerTestInstance<T>(serializer, tupleClass, length, instances);
-			test.testAll();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
deleted file mode 100644
index a196984..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.junit.Assert;
-
-public class TupleSerializerTestInstance<T extends Tuple> extends SerializerTestInstance<T> {
-
-	public TupleSerializerTestInstance(TypeSerializer<T> serializer, Class<T> typeClass, int length, T[] testData) {
-		super(serializer, typeClass, length, testData);
-	}
-	
-	protected void deepEquals(String message, T shouldTuple, T isTuple) {
-		Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity());
-		
-		for (int i = 0; i < shouldTuple.getArity(); i++) {
-			Object should = shouldTuple.getField(i);
-			Object is = isTuple.getField(i);
-			
-			if (should.getClass().isArray()) {
-				if (should instanceof boolean[]) {
-					Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is));
-				}
-				else if (should instanceof byte[]) {
-					assertArrayEquals(message, (byte[]) should, (byte[]) is);
-				}
-				else if (should instanceof short[]) {
-					assertArrayEquals(message, (short[]) should, (short[]) is);
-				}
-				else if (should instanceof int[]) {
-					assertArrayEquals(message, (int[]) should, (int[]) is);
-				}
-				else if (should instanceof long[]) {
-					assertArrayEquals(message, (long[]) should, (long[]) is);
-				}
-				else if (should instanceof float[]) {
-					assertArrayEquals(message, (float[]) should, (float[]) is, 0.0f);
-				}
-				else if (should instanceof double[]) {
-					assertArrayEquals(message, (double[]) should, (double[]) is, 0.0);
-				}
-				else if (should instanceof char[]) {
-					assertArrayEquals(message, (char[]) should, (char[]) is);
-				}
-				else {
-					assertArrayEquals(message, (Object[]) should, (Object[]) is);
-				}
-			}
-			else {
-				assertEquals(message,  should, is);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
deleted file mode 100644
index cf9874d..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.types.StringValue;
-
-public class ValueComparatorTest extends ComparatorTestBase<StringValue> {
-
-	StringValue[] data = new StringValue[]{
-		new StringValue(""),
-		new StringValue("Lorem Ipsum Dolor Omit Longer"),
-		new StringValue("aaaa"),
-		new StringValue("abcd"),
-		new StringValue("abce"),
-		new StringValue("abdd"),
-		new StringValue("accd"),
-		new StringValue("bbcd")
-	};
-
-	@Override
-	protected TypeComparator<StringValue> createComparator(boolean ascending) {
-		return new ValueComparator<StringValue>(ascending, StringValue.class);
-	}
-
-	@Override
-	protected TypeSerializer<StringValue> createSerializer() {
-		return new ValueSerializer<StringValue>(StringValue.class);
-	}
-
-	@Override
-	protected StringValue[] getSortedTestData() {
-		return data;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
deleted file mode 100644
index a2c3e1e..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> {
-	@Override
-	protected TypeComparator<ValueID> createComparator(boolean ascending) {
-		return new ValueComparator<>(ascending, ValueID.class);
-	}
-
-	@Override
-	protected TypeSerializer<ValueID> createSerializer() {
-		return new ValueSerializer<>(ValueID.class);
-	}
-
-	@Override
-	protected ValueID[] getSortedTestData() {
-		return new ValueID[] {
-			new ValueID(new UUID(0, 0)),
-			new ValueID(new UUID(1, 0)),
-			new ValueID(new UUID(1, 1))
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
deleted file mode 100644
index d644485..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-import java.io.IOException;
-import java.util.UUID;
-
-public class ValueID implements Value, Comparable<ValueID> {
-	private static final long serialVersionUID = -562791433077971752L;
-
-	private UUID id;
-
-	public ValueID() {
-		id = UUID.randomUUID();
-	}
-
-	public ValueID(UUID id) {
-		this.id = id;
-	}
-
-	@Override
-	public int compareTo(ValueID o) {
-		return id.compareTo(o.id);
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id.getMostSignificantBits());
-		out.writeLong(id.getLeastSignificantBits());
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		id = new UUID(in.readLong(), in.readLong());
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ValueID) {
-			ValueID other = (ValueID) obj;
-
-			return id.equals(other.id);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return id.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
deleted file mode 100644
index 9c07a5e..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> {
-	@Override
-	protected TypeSerializer<ValueID> createSerializer() {
-		return new ValueSerializer<>(ValueID.class);
-	}
-
-	@Override
-	protected int getLength() {
-		return -1;
-	}
-
-	@Override
-	protected Class<ValueID> getTypeClass() {
-		return ValueID.class;
-	}
-
-	@Override
-	protected ValueID[] getTestData() {
-		return new ValueID[] {
-			new ValueID(new UUID(0, 0)),
-			new ValueID(new UUID(1, 0)),
-			new ValueID(new UUID(1, 1))
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
deleted file mode 100644
index f5a90b7..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> {	
-	
-	StringArrayWritable[] data = new StringArrayWritable[]{
-			new StringArrayWritable(new String[]{}),
-			new StringArrayWritable(new String[]{""}),
-			new StringArrayWritable(new String[]{"a","a"}),
-			new StringArrayWritable(new String[]{"a","b"}),
-			new StringArrayWritable(new String[]{"c","c"}),
-			new StringArrayWritable(new String[]{"d","f"}),
-			new StringArrayWritable(new String[]{"d","m"}),
-			new StringArrayWritable(new String[]{"z","x"}),
-			new StringArrayWritable(new String[]{"a","a", "a"})
-	};
-	
-	@Override
-	protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) {
-		return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class);
-	}
-	
-	@Override
-	protected TypeSerializer<StringArrayWritable> createSerializer() {
-		return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class);
-	}
-	
-	@Override
-	protected StringArrayWritable[] getSortedTestData() {
-		return data;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
deleted file mode 100644
index 94e759d..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
-	@Override
-	protected TypeComparator<WritableID> createComparator(boolean ascending) {
-		return new WritableComparator<>(ascending, WritableID.class);
-	}
-
-	@Override
-	protected TypeSerializer<WritableID> createSerializer() {
-		return new WritableSerializer<>(WritableID.class);
-	}
-
-	@Override
-	protected WritableID[] getSortedTestData() {
-		return new WritableID[] {
-			new WritableID(new UUID(0, 0)),
-			new WritableID(new UUID(1, 0)),
-			new WritableID(new UUID(1, 1))
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
deleted file mode 100644
index 4274cf6..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.UUID;
-
-public class WritableID implements WritableComparable<WritableID> {
-	private UUID uuid;
-
-	public WritableID() {
-		this.uuid = UUID.randomUUID();
-	}
-
-	public WritableID(UUID uuid) {
-		this.uuid = uuid;
-	}
-
-	@Override
-	public int compareTo(WritableID o) {
-		return this.uuid.compareTo(o.uuid);
-	}
-
-	@Override
-	public void write(DataOutput dataOutput) throws IOException {
-		dataOutput.writeLong(uuid.getMostSignificantBits());
-		dataOutput.writeLong(uuid.getLeastSignificantBits());
-	}
-
-	@Override
-	public void readFields(DataInput dataInput) throws IOException {
-		this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
-	}
-
-	@Override
-	public String toString() {
-		return uuid.toString();
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		WritableID id = (WritableID) o;
-
-		return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
-	}
-
-	@Override
-	public int hashCode() {
-		return uuid != null ? uuid.hashCode() : 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
deleted file mode 100644
index bb5f4d4..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.junit.Test;
-
-public class WritableSerializerTest {
-	
-	@Test
-	public void testStringArrayWritable() {
-		StringArrayWritable[] data = new StringArrayWritable[]{
-				new StringArrayWritable(new String[]{}),
-				new StringArrayWritable(new String[]{""}),
-				new StringArrayWritable(new String[]{"a","a"}),
-				new StringArrayWritable(new String[]{"a","b"}),
-				new StringArrayWritable(new String[]{"c","c"}),
-				new StringArrayWritable(new String[]{"d","f"}),
-				new StringArrayWritable(new String[]{"d","m"}),
-				new StringArrayWritable(new String[]{"z","x"}),
-				new StringArrayWritable(new String[]{"a","a", "a"})
-		};
-		
-		WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
-		WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig());
-		
-		SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data);
-		
-		testInstance.testAll();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
deleted file mode 100644
index 2af7730..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.UUID;
-
-public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
-	@Override
-	protected TypeSerializer<WritableID> createSerializer() {
-		return new WritableSerializer<>(WritableID.class);
-	}
-
-	@Override
-	protected int getLength() {
-		return -1;
-	}
-
-	@Override
-	protected Class<WritableID> getTypeClass() {
-		return WritableID.class;
-	}
-
-	@Override
-	protected WritableID[] getTestData() {
-		return new WritableID[] {
-			new WritableID(new UUID(0, 0)),
-			new WritableID(new UUID(1, 0)),
-			new WritableID(new UUID(1, 1))
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
deleted file mode 100644
index 7572408..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class KryoClearedBufferTest {
-
-	/**
-	 * Tests that the kryo output buffer is cleared in case of an exception. Flink uses the
-	 * EOFException to signal that a buffer is full. In such a case, the record which was tried
-	 * to be written will be rewritten. Therefore, eventually buffered data of this record has
-	 * to be cleared.
-	 */
-	@Test
-	public void testOutputBufferedBeingClearedInCaseOfException() throws Exception {
-		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.registerTypeWithKryoSerializer(TestRecord.class, new TestRecordSerializer());
-		executionConfig.registerKryoType(TestRecord.class);
-
-		KryoSerializer<TestRecord> kryoSerializer = new KryoSerializer<TestRecord>(
-			TestRecord.class,
-			executionConfig);
-
-		int size = 94;
-		int bufferSize = 150;
-
-		TestRecord testRecord = new TestRecord(size);
-
-		TestDataOutputView target = new TestDataOutputView(bufferSize);
-
-		kryoSerializer.serialize(testRecord, target);
-
-		try {
-			kryoSerializer.serialize(testRecord, target);
-			Assert.fail("Expected an EOFException.");
-		} catch(EOFException eofException) {
-			// expected exception
-			// now the Kryo Output should have been cleared
-		}
-
-		TestRecord actualRecord = kryoSerializer.deserialize(
-				new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer())));
-
-		Assert.assertEquals(testRecord, actualRecord);
-
-		target.clear();
-
-		// if the kryo output has been cleared then we can serialize our test record into the target
-		// because the target buffer 150 bytes can host one TestRecord (total serialization size 100)
-		kryoSerializer.serialize(testRecord, target);
-
-		byte[] buffer = target.getBuffer();
-		int counter = 0;
-
-		for (int i = 0; i < buffer.length; i++) {
-			if(buffer[i] == 42) {
-				counter++;
-			}
-		}
-
-		Assert.assertEquals(size, counter);
-	}
-
-	public static class TestRecord {
-		private byte[] buffer;
-
-		public TestRecord(int size) {
-			buffer = new byte[size];
-
-			Arrays.fill(buffer, (byte)42);
-		}
-
-		public TestRecord(byte[] buffer){
-			this.buffer = buffer;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof TestRecord) {
-				TestRecord record = (TestRecord) obj;
-
-				return Arrays.equals(buffer, record.buffer);
-			} else {
-				return false;
-			}
-		}
-	}
-
-	public static class TestRecordSerializer extends Serializer<TestRecord> implements Serializable {
-
-		private static final long serialVersionUID = 6971996565421454985L;
-
-		@Override
-		public void write(Kryo kryo, Output output, TestRecord object) {
-			output.writeInt(object.buffer.length);
-			output.write(object.buffer);
-		}
-
-		@Override
-		public TestRecord read(Kryo kryo, Input input, Class<TestRecord> type) {
-			int length = input.readInt();
-			byte[] buffer = input.readBytes(length);
-
-			return new TestRecord(buffer);
-		}
-	}
-
-	public static class TestDataOutputView implements DataOutputView {
-
-		private byte[] buffer;
-		private int position;
-
-		public TestDataOutputView(int size) {
-			buffer = new byte[size];
-			position = 0;
-		}
-
-		public void clear() {
-			position = 0;
-		}
-
-		public byte[] getBuffer() {
-			return buffer;
-		}
-
-		public void checkSize(int numBytes) throws EOFException {
-			if (position + numBytes > buffer.length) {
-				throw new EOFException();
-			}
-		}
-
-		@Override
-		public void skipBytesToWrite(int numBytes) throws IOException {
-			checkSize(numBytes);
-
-			position += numBytes;
-		}
-
-		@Override
-		public void write(DataInputView source, int numBytes) throws IOException {
-			checkSize(numBytes);
-
-			byte[] tempBuffer = new byte[numBytes];
-
-			source.read(tempBuffer);
-
-			System.arraycopy(tempBuffer, 0, buffer, position, numBytes);
-
-			position += numBytes;
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			checkSize(4);
-
-			position += 4;
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			checkSize(b.length);
-
-			System.arraycopy(b, 0, buffer, position, b.length);
-			position += b.length;
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			checkSize(len);
-
-			System.arraycopy(b, off, buffer, position, len);
-
-			position += len;
-		}
-
-		@Override
-		public void writeBoolean(boolean v) throws IOException {
-			checkSize(1);
-			position += 1;
-		}
-
-		@Override
-		public void writeByte(int v) throws IOException {
-			checkSize(1);
-
-			buffer[position] = (byte)v;
-
-			position++;
-		}
-
-		@Override
-		public void writeShort(int v) throws IOException {
-			checkSize(2);
-
-			position += 2;
-		}
-
-		@Override
-		public void writeChar(int v) throws IOException {
-			checkSize(1);
-			position++;
-		}
-
-		@Override
-		public void writeInt(int v) throws IOException {
-			checkSize(4);
-
-			position += 4;
-		}
-
-		@Override
-		public void writeLong(long v) throws IOException {
-			checkSize(8);
-			position += 8;
-		}
-
-		@Override
-		public void writeFloat(float v) throws IOException {
-			checkSize(4);
-			position += 4;
-		}
-
-		@Override
-		public void writeDouble(double v) throws IOException {
-			checkSize(8);
-			position += 8;
-		}
-
-		@Override
-		public void writeBytes(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
-			checkSize(sBuffer.length);
-			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
-			position += sBuffer.length;
-		}
-
-		@Override
-		public void writeChars(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
-			checkSize(sBuffer.length);
-			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
-			position += sBuffer.length;
-		}
-
-		@Override
-		public void writeUTF(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
-			checkSize(sBuffer.length);
-			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
-			position += sBuffer.length;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
deleted file mode 100644
index 9d3eef1..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest;
-
-public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
-	@Override
-	protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
-		return new KryoSerializer<T>(type, new ExecutionConfig());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
deleted file mode 100644
index 19aed78..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime.kryo;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest;
-
-public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
-	@Override
-	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new KryoSerializer<T>(type, new ExecutionConfig());
-	}
-}
\ No newline at end of file


Mime
View raw message